From 7f064523d8f2a93fd3699ec272283ca94083dc00 Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Fri, 3 Sep 2021 21:23:26 +0800 Subject: [PATCH] prefect combine async sync --- .../Controllers/ValuesController.cs | 43 ++-- .../SysUserModPaginationConfiguration.cs | 5 +- .../SysUserSalaryPaginationConfiguration.cs | 10 +- .../Abstractions/IStreamMergeEngine.cs | 19 -- .../IStreamMergeAsyncEnumerator.cs | 5 +- ...MemoryReverseStreamMergeAsyncEnumerator.cs | 49 ++++- ...ggregateOrderStreamMergeAsyncEnumerator.cs | 100 ++++++++-- .../MultiOrderStreamMergeAsyncEnumerator.cs | 43 +++- .../OrderStreamMergeAsyncEnumerator.cs | 26 ++- .../PaginationStreamMergeAsyncEnumerator.cs | 44 ++++- .../StreamMergeAsyncEnumerator.cs | 91 +++++++-- .../IOrderStreamMergeEnumerator.cs | 16 -- .../StreamMergeSync/IStreamMergeEnumerator.cs | 17 -- ...ultiAggregateOrderStreamMergeEnumerator.cs | 183 ------------------ .../MultiOrderStreamMergeEnumerator.cs | 110 ----------- .../OrderStreamMergeEnumerator.cs | 110 ----------- .../PaginationStreamMergeEnumerator.cs | 85 -------- .../StreamMergeSync/StreamMergeEnumerator.cs | 63 ------ .../PaginationMetadata.cs | 2 +- .../Sharding/ReWrite/ReWriteEngine.cs | 4 +- .../EnumeratorShardingQueryExecutor.cs | 1 + .../Sharding/StreamMergeContext.cs | 7 +- .../AsyncEnumerableStreamMergeEngine.cs | 27 +-- ...bstractEnumeratorAsyncStreamMergeEngine.cs | 53 +++-- .../AbstractEnumeratorStreamMergeEngine.cs | 14 +- ...AbstractEnumeratorSyncStreamMergeEngine.cs | 31 --- ...equenceEnumeratorAsyncStreamMergeEngine.cs | 29 +-- ...hardingEnumeratorAsyncStreamMergeEngine.cs | 20 +- ...hardingEnumeratorAsyncStreamMergeEngine.cs | 26 +-- ...equenceEnumeratorAsyncStreamMergeEngine.cs | 23 +-- ...leQueryEnumeratorAsyncStreamMergeEngine.cs | 25 ++- 31 files changed, 446 insertions(+), 835 deletions(-) delete mode 100644 src/ShardingCore/Sharding/Abstractions/IStreamMergeEngine.cs delete mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IOrderStreamMergeEnumerator.cs delete mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IStreamMergeEnumerator.cs delete mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiAggregateOrderStreamMergeEnumerator.cs delete mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiOrderStreamMergeEnumerator.cs delete mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/OrderStreamMergeEnumerator.cs delete mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/PaginationStreamMergeEnumerator.cs delete mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/StreamMergeEnumerator.cs delete mode 100644 src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorSyncStreamMergeEngine.cs rename src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/{ => EnumeratorAsync}/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs (77%) rename src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/{ => EnumeratorAsync}/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs (80%) rename src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/{ => EnumeratorAsync}/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs (84%) rename src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/{ => EnumeratorAsync}/SequenceEnumeratorAsyncStreamMergeEngine.cs (83%) rename src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/{ => EnumeratorAsync}/SingleQueryEnumeratorAsyncStreamMergeEngine.cs (62%) diff --git a/samples/Sample.SqlServer/Controllers/ValuesController.cs b/samples/Sample.SqlServer/Controllers/ValuesController.cs index 518f37e1..37864e99 100644 --- a/samples/Sample.SqlServer/Controllers/ValuesController.cs +++ b/samples/Sample.SqlServer/Controllers/ValuesController.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; @@ -58,15 +59,15 @@ namespace Sample.SqlServer.Controllers - //var sresultx11231 = _defaultTableDbContext.Set().Where(o => o.Age == 198198).Select(o => o.Id).Contains("1981"); - //var sresultx1121 = _defaultTableDbContext.Set().Where(o => o.Id == "198").Sum(o => o.Age); - //var sresultx111 = _defaultTableDbContext.Set().FirstOrDefault(o => o.Id == "198"); - //var sresultx2 = _defaultTableDbContext.Set().Count(o => o.Age <= 10); - //var sresultx = _defaultTableDbContext.Set().Where(o => o.Id == "198").FirstOrDefault(); - //var sresultx33 = _defaultTableDbContext.Set().Where(o => o.Id == "198").Select(o => o.Id).FirstOrDefault(); - //var sresultxc = _defaultTableDbContext.Set().Where(o => o.Id == "198").Select(o => o.Id).ToList(); - //var sresultxasdc = _defaultTableDbContext.Set().Where(o => o.Id == "198").ToList(); - //var sresult = _defaultTableDbContext.Set().ToList(); + var sresultx11231 = _defaultTableDbContext.Set().Where(o => o.Age == 198198).Select(o => o.Id).Contains("1981"); + var sresultx1121 = _defaultTableDbContext.Set().Where(o => o.Id == "198").Sum(o => o.Age); + var sresultx111 = _defaultTableDbContext.Set().FirstOrDefault(o => o.Id == "198"); + var sresultx2 = _defaultTableDbContext.Set().Count(o => o.Age <= 10); + var sresultx = _defaultTableDbContext.Set().Where(o => o.Id == "198").FirstOrDefault(); + var sresultx33 = _defaultTableDbContext.Set().Where(o => o.Id == "198").Select(o => o.Id).FirstOrDefault(); + var sresultxc = _defaultTableDbContext.Set().Where(o => o.Id == "198").Select(o => o.Id).ToList(); + var sresultxasdc = _defaultTableDbContext.Set().Where(o => o.Id == "198").ToList(); + var sresult = _defaultTableDbContext.Set().ToList(); var sysUserMod98 = result.FirstOrDefault(o => o.Id == "98"); _defaultTableDbContext.Attach(sysUserMod98); @@ -89,8 +90,28 @@ namespace Sample.SqlServer.Controllers [HttpGet] public async Task Get1([FromQuery] int p,[FromQuery]int s) { - var shardingPageResultAsync = await _defaultTableDbContext.Set().OrderByDescending(o=>o.DateOfMonth).ToShardingPageAsync(p, s); - return Ok(shardingPageResultAsync); + Stopwatch sp = new Stopwatch(); + sp.Start(); + var shardingPageResultAsync = await _defaultTableDbContext.Set().OrderBy(o=>o.Age).ToShardingPageAsync(p, s); + sp.Stop(); + return Ok(new + { + sp.ElapsedMilliseconds, + shardingPageResultAsync + }); + } + [HttpGet] + public IActionResult Get2([FromQuery] int p,[FromQuery]int s) + { + Stopwatch sp = new Stopwatch(); + sp.Start(); + var shardingPageResultAsync = _defaultTableDbContext.Set().OrderBy(o=>o.Age).ToShardingPage(p, s); + sp.Stop(); + return Ok(new + { + sp.ElapsedMilliseconds, + shardingPageResultAsync + }); } } } \ No newline at end of file diff --git a/samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs b/samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs index 1c4ad58b..88cb2e67 100644 --- a/samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs +++ b/samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs @@ -11,8 +11,9 @@ namespace Sample.SqlServer.Shardings { public void Configure(PaginationBuilder builder) { - builder.PaginationSequence(o => o.Id) - .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch); + //builder.PaginationSequence(o => o.Age) + // .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch); + builder.ConfigReverseShardingPage(reverseTotalGe:900); } } } diff --git a/samples/Sample.SqlServer/Shardings/SysUserSalaryPaginationConfiguration.cs b/samples/Sample.SqlServer/Shardings/SysUserSalaryPaginationConfiguration.cs index b215a245..21b2b4d7 100644 --- a/samples/Sample.SqlServer/Shardings/SysUserSalaryPaginationConfiguration.cs +++ b/samples/Sample.SqlServer/Shardings/SysUserSalaryPaginationConfiguration.cs @@ -11,10 +11,12 @@ namespace Sample.SqlServer.Shardings { public void Configure(PaginationBuilder builder) { - //builder.PaginationSequence(o => o.Id) - // .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch); - //builder.PaginationSequence(o => o.DateOfMonth) - // .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch).UseAppendIfOrderNone(); + builder.PaginationSequence(o => o.Id) + .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch); + builder.PaginationSequence(o => o.DateOfMonth) + .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch).UseAppendIfOrderNone(); + builder.PaginationSequence(o => o.Salary) + .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch).UseAppendIfOrderNone(); builder.ConfigReverseShardingPage(); } } diff --git a/src/ShardingCore/Sharding/Abstractions/IStreamMergeEngine.cs b/src/ShardingCore/Sharding/Abstractions/IStreamMergeEngine.cs deleted file mode 100644 index be78fbfd..00000000 --- a/src/ShardingCore/Sharding/Abstractions/IStreamMergeEngine.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using System.Threading.Tasks; -using ShardingCore.Sharding.Enumerators; -using ShardingCore.Sharding.Enumerators.StreamMergeSync; - -namespace ShardingCore.Sharding.Abstractions -{ -/* -* @Author: xjm -* @Description: -* @Date: Saturday, 14 August 2021 22:05:40 -* @Email: 326308290@qq.com -*/ - public interface IStreamMergeEngine - { - Task> GetAsyncEnumerator(); - IStreamMergeEnumerator GetEnumerator(); - } -} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/IStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/IStreamMergeAsyncEnumerator.cs index 780aaa72..bc063e0c 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/IStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/IStreamMergeAsyncEnumerator.cs @@ -8,11 +8,12 @@ namespace ShardingCore.Sharding.Enumerators * @Date: Saturday, 14 August 2021 21:21:44 * @Email: 326308290@qq.com */ - public interface IStreamMergeAsyncEnumerator:IAsyncEnumerator + public interface IStreamMergeAsyncEnumerator:IAsyncEnumerator,IEnumerator { bool SkipFirst(); bool HasElement(); T ReallyCurrent { get; } - + T GetCurrent(); + } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryReverseStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryReverseStreamMergeAsyncEnumerator.cs index db5bd03d..6ef6bbb0 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryReverseStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryReverseStreamMergeAsyncEnumerator.cs @@ -1,4 +1,5 @@ using System; +using System.Collections; using System.Collections.Generic; using System.Linq; using System.Text; @@ -13,11 +14,11 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class InMemoryReverseStreamMergeAsyncEnumerator:IStreamMergeAsyncEnumerator + public class InMemoryReverseStreamMergeAsyncEnumerator : IStreamMergeAsyncEnumerator { private readonly IStreamMergeAsyncEnumerator _inMemoryStreamMergeAsyncEnumerator; private bool _first = true; - private IEnumerator _reverseEnumerator = Enumerable.Empty().GetEnumerator(); + private IEnumerator _reverseEnumerator; public InMemoryReverseStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator inMemoryStreamMergeAsyncEnumerator) { _inMemoryStreamMergeAsyncEnumerator = inMemoryStreamMergeAsyncEnumerator; @@ -32,20 +33,44 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync { if (_first) { - ICollection _reverseCollection = new LinkedList(); - while(await _inMemoryStreamMergeAsyncEnumerator.MoveNextAsync()) + LinkedList _reverseCollection = new LinkedList(); + while (await _inMemoryStreamMergeAsyncEnumerator.MoveNextAsync()) { - _reverseCollection.Add(_inMemoryStreamMergeAsyncEnumerator.Current); + _reverseCollection.AddFirst(_inMemoryStreamMergeAsyncEnumerator.GetCurrent()); } - _reverseEnumerator = _reverseCollection.Reverse().GetEnumerator(); + _reverseEnumerator = _reverseCollection.GetEnumerator(); _first = false; } return _reverseEnumerator.MoveNext(); } - public T Current => _reverseEnumerator.Current; + public bool MoveNext() + { + if (_first) + { + LinkedList _reverseCollection = new LinkedList(); + while ( _inMemoryStreamMergeAsyncEnumerator.MoveNext()) + { + _reverseCollection.AddFirst(_inMemoryStreamMergeAsyncEnumerator.GetCurrent()); + } + + _reverseEnumerator = _reverseCollection.GetEnumerator(); + _first = false; + } + + return _reverseEnumerator.MoveNext(); + } + + public void Reset() + { + throw new NotImplementedException(); + } + + object IEnumerator.Current => Current; + + public T Current => GetCurrent(); public bool SkipFirst() { throw new NotImplementedException(); @@ -57,5 +82,15 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync } public T ReallyCurrent => Current; + public T GetCurrent() + { + return _reverseEnumerator == null ? default : _reverseEnumerator.Current; + } + + public void Dispose() + { + _inMemoryStreamMergeAsyncEnumerator.Dispose(); + _reverseEnumerator.Dispose(); + } } } diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiAggregateOrderStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiAggregateOrderStreamMergeAsyncEnumerator.cs index 9b7654af..1959298b 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiAggregateOrderStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiAggregateOrderStreamMergeAsyncEnumerator.cs @@ -1,4 +1,5 @@ using System; +using System.Collections; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -9,15 +10,15 @@ using ShardingCore.Sharding.Enumerators.AggregateExtensions; namespace ShardingCore.Sharding.Enumerators { -/* -* @Author: xjm -* @Description: -* @Date: Sunday, 15 August 2021 06:43:26 -* @Email: 326308290@qq.com -*/ - public class MultiAggregateOrderStreamMergeAsyncEnumerator:IStreamMergeAsyncEnumerator + /* + * @Author: xjm + * @Description: + * @Date: Sunday, 15 August 2021 06:43:26 + * @Email: 326308290@qq.com + */ + public class MultiAggregateOrderStreamMergeAsyncEnumerator : IStreamMergeAsyncEnumerator { - + private readonly StreamMergeContext _mergeContext; private readonly IEnumerable> _enumerators; private readonly PriorityQueue> _queue; @@ -65,10 +66,10 @@ namespace ShardingCore.Sharding.Enumerators if (_queue.IsEmpty()) return false; #if !EFCORE2 - var hasNext = await SetCurrentValue(); + var hasNext = await SetCurrentValueAsync(); #endif #if EFCORE2 - var hasNext = await SetCurrentValue(cancellationToken); + var hasNext = await SetCurrentValueAsync(cancellationToken); #endif if (hasNext) { @@ -89,17 +90,17 @@ namespace ShardingCore.Sharding.Enumerators return true; } #if !EFCORE2 - private async ValueTask SetCurrentValue() + private async ValueTask SetCurrentValueAsync() #endif #if EFCORE2 - private async Task SetCurrentValue(CancellationToken cancellationToken=new CancellationToken()) + private async Task SetCurrentValueAsync(CancellationToken cancellationToken=new CancellationToken()) #endif { CurrentValue = default; var currentValues = new List(); while (EqualWithGroupValues()) { - var current=_queue.Peek().Current; + var current = _queue.Peek().GetCurrent(); currentValues.Add(current); var first = _queue.Poll(); @@ -123,10 +124,46 @@ namespace ShardingCore.Sharding.Enumerators return true; } + public bool MoveNext() + { + if (_queue.IsEmpty()) + return false; + var hasNext = SetCurrentValue(); + if (hasNext) + { + CurrentGroupValues = _queue.IsEmpty() ? new List(0) : GetCurrentGroupValues(_queue.Peek()); + } + return hasNext; + } + private bool SetCurrentValue() + { + CurrentValue = default; + var currentValues = new List(); + while (EqualWithGroupValues()) + { + var current = _queue.Peek().GetCurrent(); + currentValues.Add(current); + var first = _queue.Poll(); + + if (first.MoveNext()) + { + _queue.Offer(first); + } + + if (_queue.IsEmpty()) + { + break; + } + } + + MergeValue(currentValues); + + return true; + } private void MergeValue(List aggregateValues) { - + if (aggregateValues.IsNotEmpty()) { CurrentValue = aggregateValues.First(); @@ -141,16 +178,20 @@ namespace ShardingCore.Sharding.Enumerators if (aggregate.AggregateMethod == nameof(Queryable.Count)) { aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName); - } else if (aggregate.AggregateMethod == nameof(Queryable.Sum)) + } + else if (aggregate.AggregateMethod == nameof(Queryable.Sum)) { aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName); - } else if (aggregate.AggregateMethod == nameof(Queryable.Max)) + } + else if (aggregate.AggregateMethod == nameof(Queryable.Max)) { aggregateValue = aggregateValues.AsQueryable().Max(aggregate.PropertyName); - }else if (aggregate.AggregateMethod == nameof(Queryable.Min)) + } + else if (aggregate.AggregateMethod == nameof(Queryable.Min)) { aggregateValue = aggregateValues.AsQueryable().Min(aggregate.PropertyName); - }else if (aggregate.AggregateMethod == nameof(Queryable.Average)) + } + else if (aggregate.AggregateMethod == nameof(Queryable.Average)) { aggregateValue = aggregateValues.AsQueryable().Average(aggregate.PropertyName); } @@ -158,7 +199,7 @@ namespace ShardingCore.Sharding.Enumerators { throw new InvalidOperationException($"method:{aggregate.AggregateMethod} invalid operation "); } - CurrentValue.SetPropertyValue(aggregate.PropertyName,aggregateValue); + CurrentValue.SetPropertyValue(aggregate.PropertyName, aggregateValue); } } } @@ -176,7 +217,11 @@ namespace ShardingCore.Sharding.Enumerators return ReallyCurrent != null; } - public T ReallyCurrent => _queue.IsEmpty()?default(T):_queue.Peek().ReallyCurrent; + public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent; + public T GetCurrent() + { + return CurrentValue; + } #if !EFCORE2 @@ -198,6 +243,21 @@ namespace ShardingCore.Sharding.Enumerators } #endif + + public void Reset() + { + throw new NotImplementedException(); + } + + object IEnumerator.Current => Current; + public T Current => CurrentValue; + public void Dispose() + { + foreach (var enumerator in _enumerators) + { + enumerator.Dispose(); + } + } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiOrderStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiOrderStreamMergeAsyncEnumerator.cs index 7b1b3185..343c6560 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiOrderStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiOrderStreamMergeAsyncEnumerator.cs @@ -1,3 +1,4 @@ +using System.Collections; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -80,6 +81,30 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync _currentEnumerator = _queue.Peek(); return true; } + public bool MoveNext() + { + if (_queue.IsEmpty()) + return false; + if (skipFirst) + { + skipFirst = false; + return true; + } + + var first = _queue.Poll(); + if (first.MoveNext()) + { + _queue.Offer(first); + } + + if (_queue.IsEmpty()) + { + return false; + } + + _currentEnumerator = _queue.Peek(); + return true; + } public bool SkipFirst() @@ -98,6 +123,10 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync } public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent; + public T GetCurrent() + { + return _currentEnumerator.GetCurrent(); + } #if !EFCORE2 @@ -121,6 +150,18 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync #endif - public T Current => skipFirst ? default : _currentEnumerator.Current; + + public void Reset() + { + throw new System.NotImplementedException(); + } + + object IEnumerator.Current => Current; + + public T Current => skipFirst ? default : _currentEnumerator.GetCurrent(); + public void Dispose() + { + _currentEnumerator.Dispose(); + } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs index 12239afc..e2715423 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs @@ -1,4 +1,5 @@ using System; +using System.Collections; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -55,7 +56,26 @@ namespace ShardingCore.Sharding.Enumerators } - public T Current => _enumerator.Current; + public bool MoveNext() + { + var has = _enumerator.MoveNext(); + SetOrderValues(); + return has; + } + + public void Reset() + { + throw new NotImplementedException(); + } + + object IEnumerator.Current => Current; + + public T Current => GetCurrent(); + + public void Dispose() + { + _enumerator.Dispose(); + } public bool SkipFirst() { @@ -68,6 +88,10 @@ namespace ShardingCore.Sharding.Enumerators } public T ReallyCurrent => _enumerator.ReallyCurrent; + public T GetCurrent() + { + return _enumerator.GetCurrent(); + } private List GetCurrentOrderValues() { diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs index 9aa79792..f12be9eb 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs @@ -1,4 +1,5 @@ using System; +using System.Collections; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -72,7 +73,40 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync return next; } - public T Current => _enumerator.Current; + public bool MoveNext() + { + //如果合并数据的时候不需要跳过也没有take多少那么就是直接next + while (_skip.GetValueOrDefault() > this.realSkip) + { + var has = _enumerator.MoveNext(); + realSkip++; + if (!has) + return false; + } + + var next = _enumerator.MoveNext(); + + if (next) + { + if (_take.HasValue) + { + realTake++; + if (realTake > _take.Value) + return false; + } + } + + return next; + } + + public void Reset() + { + throw new NotImplementedException(); + } + + object IEnumerator.Current => Current; + + public T Current => _enumerator.GetCurrent(); public bool SkipFirst() { return _enumerator.SkipFirst(); @@ -84,6 +118,14 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync } public T ReallyCurrent => _enumerator.ReallyCurrent; + public T GetCurrent() + { + return _enumerator.GetCurrent(); + } + public void Dispose() + { + _enumerator.Dispose(); + } #if !EFCORE2 public ValueTask DisposeAsync() diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs index fc574405..fd3864a4 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs @@ -6,20 +6,31 @@ using System.Threading.Tasks; namespace ShardingCore.Sharding.Enumerators { -/* -* @Author: xjm -* @Description: -* @Date: Saturday, 14 August 2021 21:25:50 -* @Email: 326308290@qq.com -*/ - public class StreamMergeAsyncEnumerator:IStreamMergeAsyncEnumerator + /* + * @Author: xjm + * @Description: + * @Date: Saturday, 14 August 2021 21:25:50 + * @Email: 326308290@qq.com + */ + public class StreamMergeAsyncEnumerator : IStreamMergeAsyncEnumerator { - private readonly IAsyncEnumerator _source; + private readonly IAsyncEnumerator _asyncSource; + private readonly IEnumerator _syncSource; private bool skip; - public StreamMergeAsyncEnumerator(IAsyncEnumerator source) + public StreamMergeAsyncEnumerator(IAsyncEnumerator asyncSource) { - _source = source; + if (_syncSource != null) + throw new ArgumentNullException(nameof(_syncSource)); + _asyncSource = asyncSource; + skip = true; + } + + public StreamMergeAsyncEnumerator(IEnumerator syncSource) + { + if (_asyncSource != null) + throw new ArgumentNullException(nameof(_asyncSource)); + _syncSource = syncSource; skip = true; } @@ -33,9 +44,10 @@ namespace ShardingCore.Sharding.Enumerators return false; } #if !EFCORE2 - public ValueTask DisposeAsync() + public async ValueTask DisposeAsync() { - return _source.DisposeAsync(); + if (_asyncSource != null) + await _asyncSource.DisposeAsync(); } public async ValueTask MoveNextAsync() @@ -43,22 +55,61 @@ namespace ShardingCore.Sharding.Enumerators if (skip) { skip = false; - return null!=_source.Current; + return null != _asyncSource.Current; } - return await _source.MoveNextAsync(); + return await _asyncSource.MoveNextAsync(); } - public T Current => skip?default:_source.Current; - public T ReallyCurrent => _source.Current; + + public bool MoveNext() + { + if (skip) + { + skip = false; + return null != _syncSource.Current; + } + return _syncSource.MoveNext(); + } + + public void Reset() + { + throw new NotImplementedException(); + } + + object IEnumerator.Current => Current; + + public T Current => GetCurrent(); + public T ReallyCurrent => GetReallyCurrent(); public bool HasElement() { - return null != _source.Current; + if (_asyncSource != null) return null != _asyncSource.Current; + if (_syncSource != null) return null != _syncSource.Current; + return false; + } + public void Dispose() + { + _syncSource?.Dispose(); + } + + public T GetCurrent() + { + if (skip) + return default; + if (_asyncSource != null) return _asyncSource.Current; + if (_syncSource != null) return _syncSource.Current; + return default; + } + public T GetReallyCurrent() + { + if (_asyncSource != null) return _asyncSource.Current; + if (_syncSource != null) return _syncSource.Current; + return default; } #endif #if EFCORE2 public void Dispose() { - _source.Dispose(); + _asyncSource.Dispose(); } public async Task MoveNext(CancellationToken cancellationToken=new CancellationToken()) @@ -68,7 +119,7 @@ namespace ShardingCore.Sharding.Enumerators skip = false; return null != SourceCurrent(); } - return await _source.MoveNext(cancellationToken); + return await _asyncSource.MoveNext(cancellationToken); } public T Current => skip ? default : SourceCurrent(); public T ReallyCurrent => SourceCurrent(); @@ -83,7 +134,7 @@ namespace ShardingCore.Sharding.Enumerators { if (tryGetCurrentError) return default; - return _source.Current; + return _asyncSource.Current; }catch(Exception e) { tryGetCurrentError = true; diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IOrderStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IOrderStreamMergeEnumerator.cs deleted file mode 100644 index 50ef08b0..00000000 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IOrderStreamMergeEnumerator.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; -using System.Collections.Generic; - -namespace ShardingCore.Sharding.Enumerators.StreamMergeSync -{ -/* -* @Author: xjm -* @Description: -* @Date: Sunday, 15 August 2021 06:44:33 -* @Email: 326308290@qq.com -*/ - public interface IOrderStreamMergeEnumerator:IStreamMergeEnumerator, IComparable> - { - List GetCompares(); - } -} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IStreamMergeEnumerator.cs deleted file mode 100644 index 0149a001..00000000 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IStreamMergeEnumerator.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System.Collections.Generic; - -namespace ShardingCore.Sharding.Enumerators.StreamMergeSync -{ -/* -* @Author: xjm -* @Description: -* @Date: Saturday, 14 August 2021 22:06:38 -* @Email: 326308290@qq.com -*/ - public interface IStreamMergeEnumerator:IEnumerator - { - bool SkipFirst(); - bool HasElement(); - T ReallyCurrent { get; } - } -} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiAggregateOrderStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiAggregateOrderStreamMergeEnumerator.cs deleted file mode 100644 index 9c697281..00000000 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiAggregateOrderStreamMergeEnumerator.cs +++ /dev/null @@ -1,183 +0,0 @@ -using System; -using System.Collections; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using ShardingCore.Core.Internal.PriorityQueues; -using ShardingCore.Extensions; -using ShardingCore.Sharding.Enumerators.AggregateExtensions; - -namespace ShardingCore.Sharding.Enumerators.StreamMergeSync -{ - /* - * @Author: xjm - * @Description: - * @Date: Sunday, 15 August 2021 06:43:26 - * @Email: 326308290@qq.com - */ - public class MultiAggregateOrderStreamMergeEnumerator : IStreamMergeEnumerator - { - - private readonly StreamMergeContext _mergeContext; - private readonly IEnumerable> _enumerators; - private readonly PriorityQueue> _queue; - private T CurrentValue; - private List CurrentGroupValues; - private bool _skipFirst; - - public MultiAggregateOrderStreamMergeEnumerator(StreamMergeContext mergeContext, IEnumerable> enumerators) - { - _mergeContext = mergeContext; - _enumerators = enumerators; - _queue = new PriorityQueue>(enumerators.Count()); - _skipFirst = true; - SetOrderEnumerator(); - } - - private void SetOrderEnumerator() - { - foreach (var source in _enumerators) - { - var orderStreamEnumerator = new OrderStreamMergeEnumerator(_mergeContext, source); - if (orderStreamEnumerator.HasElement()) - { - orderStreamEnumerator.SkipFirst(); - _queue.Offer(orderStreamEnumerator); - } - } - //设置第一个元素聚合的属性值 - CurrentGroupValues = _queue.IsEmpty() ? new List(0) : GetCurrentGroupValues(_queue.Peek()); - } - - private List GetCurrentGroupValues(IOrderStreamMergeEnumerator enumerator) - { - var first = enumerator.ReallyCurrent; - return _mergeContext.SelectContext.SelectProperties.Where(o => !o.IsAggregateMethod) - .Select(o => first.GetValueByExpression(o.PropertyName)).ToList(); - } - public bool MoveNext() - { - if (_queue.IsEmpty()) - return false; - var hasNext = SetCurrentValue(); - if (hasNext) - { - CurrentGroupValues = _queue.IsEmpty() ? new List(0) : GetCurrentGroupValues(_queue.Peek()); - } - return hasNext; - } - - public void Reset() - { - throw new NotImplementedException(); - } - - object IEnumerator.Current => Current; - - private bool EqualWithGroupValues() - { - var current = GetCurrentGroupValues(_queue.Peek()); - for (int i = 0; i < CurrentGroupValues.Count; i++) - { - if (!CurrentGroupValues[i].Equals(current[i])) - return false; - } - - return true; - } - private bool SetCurrentValue() - { - CurrentValue = default; - var currentValues = new List(); - while (EqualWithGroupValues()) - { - var current = _queue.Peek().Current; - currentValues.Add(current); - var first = _queue.Poll(); - - if (first.MoveNext()) - { - _queue.Offer(first); - } - - if (_queue.IsEmpty()) - { - break; - } - } - - MergeValue(currentValues); - - return true; - } - - private void MergeValue(List aggregateValues) - { - - if (aggregateValues.IsNotEmpty()) - { - CurrentValue = aggregateValues.First(); - if (aggregateValues.Count > 1) - { - var aggregates = _mergeContext.SelectContext.SelectProperties.Where(o => o.IsAggregateMethod).ToList(); - if (aggregates.IsNotEmpty()) - { - foreach (var aggregate in aggregates) - { - object aggregateValue = null; - if (aggregate.AggregateMethod == nameof(Queryable.Count)) - { - aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName); - } - else if (aggregate.AggregateMethod == nameof(Queryable.Sum)) - { - aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName); - } - else if (aggregate.AggregateMethod == nameof(Queryable.Max)) - { - aggregateValue = aggregateValues.AsQueryable().Max(aggregate.PropertyName); - } - else if (aggregate.AggregateMethod == nameof(Queryable.Min)) - { - aggregateValue = aggregateValues.AsQueryable().Min(aggregate.PropertyName); - } - else if (aggregate.AggregateMethod == nameof(Queryable.Average)) - { - aggregateValue = aggregateValues.AsQueryable().Average(aggregate.PropertyName); - } - else - { - throw new InvalidOperationException($"method:{aggregate.AggregateMethod} invalid operation "); - } - CurrentValue.SetPropertyValue(aggregate.PropertyName, aggregateValue); - } - } - } - } - } - - - public bool SkipFirst() - { - return true; - } - - public bool HasElement() - { - return ReallyCurrent != null; - } - - public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent; - - - public void Dispose() - { - foreach (var enumerator in _enumerators) - { - enumerator?.Dispose(); - } - } - - public T Current => CurrentValue; - } -} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiOrderStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiOrderStreamMergeEnumerator.cs deleted file mode 100644 index bf6445f4..00000000 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiOrderStreamMergeEnumerator.cs +++ /dev/null @@ -1,110 +0,0 @@ -using System.Collections; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using ShardingCore.Core.Internal.PriorityQueues; - -namespace ShardingCore.Sharding.Enumerators.StreamMergeSync -{ - /* - * @Author: xjm - * @Description: - * @Date: Sunday, 15 August 2021 06:49:09 - * @Email: 326308290@qq.com - */ - public class MultiOrderStreamMergeEnumerator : IStreamMergeEnumerator - { - - private readonly StreamMergeContext _mergeContext; - private readonly IEnumerable> _enumerators; - private readonly PriorityQueue> _queue; - private IStreamMergeEnumerator _currentEnumerator; - private bool skipFirst; - - public MultiOrderStreamMergeEnumerator(StreamMergeContext mergeContext, IEnumerable> enumerators) - { - _mergeContext = mergeContext; - _enumerators = enumerators; - _queue = new PriorityQueue>(enumerators.Count()); - skipFirst = true; - SetOrderEnumerator(); - } - - private void SetOrderEnumerator() - { - foreach (var source in _enumerators) - { - var orderStreamEnumerator = new OrderStreamMergeEnumerator(_mergeContext, source); - if (orderStreamEnumerator.HasElement()) - { - orderStreamEnumerator.SkipFirst(); - _queue.Offer(orderStreamEnumerator); - } - } - - _currentEnumerator = _queue.IsEmpty() ? _enumerators.FirstOrDefault() : _queue.Peek(); - } - public bool MoveNext() - { - if (_queue.IsEmpty()) - return false; - if (skipFirst) - { - skipFirst = false; - return true; - } - - var first = _queue.Poll(); - - if (first.MoveNext()) - { - _queue.Offer(first); - } - - if (_queue.IsEmpty()) - { - return false; - } - - _currentEnumerator = _queue.Peek(); - return true; - } - - public void Reset() - { - throw new System.NotImplementedException(); - } - - object IEnumerator.Current => Current; - - - public bool SkipFirst() - { - if (skipFirst) - { - skipFirst = false; - return true; - } - return false; - } - - public bool HasElement() - { - return _currentEnumerator != null && _currentEnumerator.HasElement(); - } - - public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent; - - - public void Dispose() - { - foreach (var enumerator in _enumerators) - { - enumerator?.Dispose(); - } - } - - - public T Current => skipFirst ? default : _currentEnumerator.Current; - } -} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/OrderStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/OrderStreamMergeEnumerator.cs deleted file mode 100644 index 0dc1b799..00000000 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/OrderStreamMergeEnumerator.cs +++ /dev/null @@ -1,110 +0,0 @@ -using System; -using System.Collections; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using ShardingCore.Core.Internal.StreamMerge; -using ShardingCore.Extensions; - -namespace ShardingCore.Sharding.Enumerators.StreamMergeSync -{ -/* -* @Author: xjm -* @Description: -* @Date: Sunday, 15 August 2021 06:46:32 -* @Email: 326308290@qq.com -*/ - public class OrderStreamMergeEnumerator:IOrderStreamMergeEnumerator - { - - /// - /// 合并数据上下文 - /// - private readonly StreamMergeContext _mergeContext; - - private readonly IStreamMergeEnumerator _enumerator; - private List _orderValues; - - public OrderStreamMergeEnumerator(StreamMergeContext mergeContext, IStreamMergeEnumerator enumerator) - { - _mergeContext = mergeContext; - _enumerator = enumerator; - SetOrderValues(); - } - - private void SetOrderValues() - { - _orderValues = HasElement() ? GetCurrentOrderValues() : new List(0); - } - - - public bool MoveNext() - { - var has = _enumerator.MoveNext(); - SetOrderValues(); - return has; - } - - public void Reset() - { - throw new NotImplementedException(); - } - - object IEnumerator.Current => Current; - - - public T Current =>_enumerator.Current; - - public bool SkipFirst() - { - return _enumerator.SkipFirst(); - } - - public bool HasElement() - { - return _enumerator.HasElement(); - } - - public T ReallyCurrent => _enumerator.ReallyCurrent; - - private List GetCurrentOrderValues() - { - if (!_mergeContext.Orders.Any()) - return new List(0); - var list = new List(_mergeContext.Orders.Count()); - foreach (var order in _mergeContext.Orders) - { - var value = _enumerator.ReallyCurrent.GetValueByExpression(order.PropertyExpression); - if (value is IComparable comparable) - list.Add(comparable); - else - throw new NotSupportedException($"order by value [{order}] must implements IComparable"); - } - - return list; - } - - public int CompareTo(IOrderStreamMergeEnumerator other) - { - int i = 0; - foreach (var order in _mergeContext.Orders) { - int result = CompareHelper.CompareToWith(_orderValues[i], other.GetCompares()[i], order.IsAsc); - if (0 != result) { - return result; - } - i++; - } - return 0; - } - - public List GetCompares() - { - return _orderValues ?? new List(0); - } - - public void Dispose() - { - _enumerator?.Dispose(); - } - } -} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/PaginationStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/PaginationStreamMergeEnumerator.cs deleted file mode 100644 index da4a4456..00000000 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/PaginationStreamMergeEnumerator.cs +++ /dev/null @@ -1,85 +0,0 @@ -using System.Collections; -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace ShardingCore.Sharding.Enumerators.StreamMergeSync -{ - /* - * @Author: xjm - * @Description: - * @Date: Sunday, 15 August 2021 06:39:52 - * @Email: 326308290@qq.com - */ - public class PaginationStreamMergeEnumerator : IStreamMergeEnumerator - { - private readonly StreamMergeContext _mergeContext; - private readonly IStreamMergeEnumerator _enumerator; - private readonly int? _skip; - private readonly int? _take; - private int realSkip = 0; - private int realTake = 0; - - public PaginationStreamMergeEnumerator(StreamMergeContext mergeContext, IEnumerable> sources) - { - _mergeContext = mergeContext; - _skip = mergeContext.Skip; - _take = mergeContext.Take; - if (_mergeContext.HasGroupQuery()) - _enumerator = new MultiAggregateOrderStreamMergeEnumerator(_mergeContext, sources); - else - _enumerator = new MultiOrderStreamMergeEnumerator(_mergeContext, sources); - } - - public bool MoveNext() - { - //如果合并数据的时候不需要跳过也没有take多少那么就是直接next - while (_skip.GetValueOrDefault() > this.realSkip) - { - var has = _enumerator.MoveNext(); - - realSkip++; - if (!has) - return false; - } - - var next = _enumerator.MoveNext(); - - if (next) - { - if (_take.HasValue) - { - realTake++; - if (realTake > _take.Value) - return false; - } - } - - return next; - } - - public void Reset() - { - throw new System.NotImplementedException(); - } - - object IEnumerator.Current => Current; - - public T Current => _enumerator.Current; - public bool SkipFirst() - { - return _enumerator.SkipFirst(); - } - - public bool HasElement() - { - return _enumerator.HasElement(); - } - - public T ReallyCurrent => _enumerator.ReallyCurrent; - - public void Dispose() - { - _enumerator?.Dispose(); - } - } -} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/StreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/StreamMergeEnumerator.cs deleted file mode 100644 index bb5c9596..00000000 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/StreamMergeEnumerator.cs +++ /dev/null @@ -1,63 +0,0 @@ -using System; -using System.Collections; -using System.Collections.Generic; - -namespace ShardingCore.Sharding.Enumerators.StreamMergeSync -{ -/* -* @Author: xjm -* @Description: -* @Date: Saturday, 14 August 2021 21:25:50 -* @Email: 326308290@qq.com -*/ - public class StreamMergeEnumerator:IStreamMergeEnumerator - { - private readonly IEnumerator _source; - private bool skip; - - public StreamMergeEnumerator(IEnumerator source) - { - _source = source; - skip = true; - } - - public bool MoveNext() - { - if (skip) - { - skip = false; - return null != _source.Current; - } - return _source.MoveNext(); - } - - public void Reset() - { - throw new NotImplementedException(); - } - - object IEnumerator.Current => Current; - - public T Current => skip?default:_source.Current; - public bool SkipFirst() - { - if (skip) - { - skip = false; - return true; - } - return false; - } - - public bool HasElement() - { - return null != _source.Current; - } - - public T ReallyCurrent => _source.Current; - public void Dispose() - { - _source?.Dispose(); - } - } -} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs index 1f9bf7d3..63fbdba5 100644 --- a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs @@ -31,7 +31,7 @@ namespace ShardingCore.Sharding.PaginationConfigurations /// /// 是否已开启反向排序 仅支持单排序 /// - public bool EnableReverseShardingPage => ReverseFactor > 0 && ReverseFactor < 1 && ReverseTotalGe >= 1000; + public bool EnableReverseShardingPage => ReverseFactor > 0 && ReverseFactor < 1 && ReverseTotalGe >= 500; /// /// 分表发现如果少于多少条后直接取到内存 LESS THAN OR EQUAL /// diff --git a/src/ShardingCore/Sharding/ReWrite/ReWriteEngine.cs b/src/ShardingCore/Sharding/ReWrite/ReWriteEngine.cs index a4e7df72..83f627cc 100644 --- a/src/ShardingCore/Sharding/ReWrite/ReWriteEngine.cs +++ b/src/ShardingCore/Sharding/ReWrite/ReWriteEngine.cs @@ -32,12 +32,12 @@ namespace ShardingCore.Core.Internal.StreamMerge.ReWrite var reWriteQueryable = _queryable; if (take.HasValue) { - reWriteQueryable = _queryable.RemoveTake(); + reWriteQueryable = reWriteQueryable.RemoveTake(); } if (skip.HasValue) { - reWriteQueryable = _queryable.RemoveSkip(); + reWriteQueryable = reWriteQueryable.RemoveSkip(); } if (take.HasValue) diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs index 46468d6d..ef97cff5 100644 --- a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs +++ b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs @@ -13,6 +13,7 @@ using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.PaginationConfigurations; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; +using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync; namespace ShardingCore.Sharding.ShardingQueryExecutors { diff --git a/src/ShardingCore/Sharding/StreamMergeContext.cs b/src/ShardingCore/Sharding/StreamMergeContext.cs index a3343204..919d5c01 100644 --- a/src/ShardingCore/Sharding/StreamMergeContext.cs +++ b/src/ShardingCore/Sharding/StreamMergeContext.cs @@ -29,7 +29,7 @@ namespace ShardingCore.Sharding private readonly IQueryable _reWriteSource; //public IEnumerable RouteResults { get; } //public DataSourceRoutingResult RoutingResult { get; } - public int? Skip { get;} + public int? Skip { get; private set; } public int? Take { get; } public IEnumerable Orders { get; private set; } @@ -72,6 +72,11 @@ namespace ShardingCore.Sharding { Orders = orders; } + + public void ReSetSkip(int? skip) + { + Skip = skip; + } public DbContext CreateDbContext(RouteResult routeResult) { var routeTail = _routeTailFactory.Create(routeResult); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs index 9e4924a0..7aa5a660 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs @@ -1,17 +1,7 @@ -using System; +using ShardingCore.Sharding.ShardingQueryExecutors; using System.Collections; using System.Collections.Generic; -using System.Linq; using System.Threading; -using System.Threading.Tasks; -using Microsoft.EntityFrameworkCore; -using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; -using ShardingCore.Exceptions; -using ShardingCore.Extensions; -using ShardingCore.Sharding.Enumerators; -using ShardingCore.Sharding.Enumerators.StreamMergeAsync; -using ShardingCore.Sharding.Enumerators.StreamMergeSync; -using ShardingCore.Sharding.ShardingQueryExecutors; #if EFCORE2 using Microsoft.EntityFrameworkCore.Extensions.Internal; @@ -36,13 +26,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines #if !EFCORE2 - - private async Task> GetAsyncEnumerator(IQueryable newQueryable) - { - var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator(); - await enumator.MoveNextAsync(); - return enumator; - } public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) { return new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync(cancellationToken) @@ -64,14 +47,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines #endif - - private IEnumerator GetEnumerator(IQueryable newQueryable) - { - var enumator = newQueryable.AsEnumerable().GetEnumerator(); - enumator.MoveNext(); - return enumator; - } - public IEnumerator GetEnumerator() { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs index b508e885..370a86f9 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs @@ -20,28 +20,53 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. * @Ver: 1.0 * @Email: 326308290@qq.com */ - public abstract class AbstractEnumeratorAsyncStreamMergeEngine: AbstractEnumeratorStreamMergeEngine + public abstract class AbstractEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorStreamMergeEngine { public AbstractEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) { } - public override IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) - { - var dbStreamMergeAsyncEnumerators = GetDbStreamMergeAsyncEnumerators(); - if (dbStreamMergeAsyncEnumerators.IsEmpty()) - throw new ShardingCoreException("GetDbStreamMergeAsyncEnumerators empty"); - return GetStreamMergeAsyncEnumerator(dbStreamMergeAsyncEnumerators); - } - public abstract IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(); + public abstract IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async); public abstract IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators); + + public Task> AsyncQueryEnumerator(IQueryable queryable,bool async) + { + return Task.Run(async () => + { + try + { + if (async) + { + var asyncEnumerator = await DoGetAsyncEnumerator(queryable); + return new StreamMergeAsyncEnumerator(asyncEnumerator); + } + else + { + var enumerator = DoGetEnumerator(queryable); + return new StreamMergeAsyncEnumerator(enumerator); + + } + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } + }); + } public async Task> DoGetAsyncEnumerator(IQueryable newQueryable) { var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator(); await enumator.MoveNextAsync(); return enumator; } + public IEnumerator DoGetEnumerator(IQueryable newQueryable) + { + var enumator = newQueryable.AsEnumerable().GetEnumerator(); + enumator.MoveNext(); + return enumator; + } // public virtual IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult) // { // var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); @@ -51,10 +76,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. // .ReplaceDbContextQueryable(shardingDbContext); // return newQueryable; // } - - public override IEnumerator GetEnumerator() + + public override IStreamMergeAsyncEnumerator GetShardingAsyncEnumerator(bool async, + CancellationToken cancellationToken = new CancellationToken()) { - throw new NotImplementedException(); + var dbStreamMergeAsyncEnumerators = GetDbStreamMergeAsyncEnumerators(async); + if (dbStreamMergeAsyncEnumerators.IsEmpty()) + throw new ShardingCoreException("GetDbStreamMergeAsyncEnumerators empty"); + return GetStreamMergeAsyncEnumerator(dbStreamMergeAsyncEnumerators); } } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs index 945474ef..1b5c0f84 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs @@ -7,6 +7,7 @@ using System.Threading; using Microsoft.EntityFrameworkCore; using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; using ShardingCore.Extensions; +using ShardingCore.Sharding.Enumerators; namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions { @@ -28,10 +29,19 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. DbContextQueryStore = new ConcurrentDictionary(); } - public abstract IAsyncEnumerator GetAsyncEnumerator( + public abstract IStreamMergeAsyncEnumerator GetShardingAsyncEnumerator(bool async, CancellationToken cancellationToken = new CancellationToken()); - public abstract IEnumerator GetEnumerator(); + public IAsyncEnumerator GetAsyncEnumerator( + CancellationToken cancellationToken = new CancellationToken()) + { + return GetShardingAsyncEnumerator(true,cancellationToken); + } + + public IEnumerator GetEnumerator() + { + return GetShardingAsyncEnumerator(false); + } IEnumerator IEnumerable.GetEnumerator() { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorSyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorSyncStreamMergeEngine.cs deleted file mode 100644 index 94270b0e..00000000 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorSyncStreamMergeEngine.cs +++ /dev/null @@ -1,31 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; - -namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions -{ - /* - * @Author: xjm - * @Description: - * @Date: 2021/9/2 15:38:13 - * @Ver: 1.0 - * @Email: 326308290@qq.com - */ - public class AbstractEnumeratorSyncStreamMergeEngine : AbstractEnumeratorStreamMergeEngine - { - public AbstractEnumeratorSyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) - { - } - - public override IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) - { - throw new NotImplementedException(); - } - - public override IEnumerator GetEnumerator() - { - throw new NotImplementedException(); - } - } -} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs similarity index 77% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs rename to src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs index bc587bb0..5f2a3104 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs @@ -1,12 +1,9 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; +using System.Threading; using System.Threading.Tasks; using ShardingCore.Core.Internal.Visitors; -using ShardingCore.Core.ShardingPage.Abstractions; -using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; -using ShardingCore.Core.VirtualTables; using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Enumerators; @@ -15,7 +12,7 @@ using ShardingCore.Sharding.PaginationConfigurations; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base; -namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync { /* * @Author: xjm @@ -28,17 +25,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines { private readonly PaginationConfig _appendPaginationConfig; private readonly ICollection> _routeQueryResults; - private IShardingPageManager _shardingPageManager; - private IVirtualTableManager _virtualTableManager; public AppenOrderSequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext, PaginationConfig appendPaginationConfig, ICollection> routeQueryResults) : base(streamMergeContext) { _appendPaginationConfig = appendPaginationConfig; _routeQueryResults = routeQueryResults; - _shardingPageManager = ShardingContainer.GetService(); - _virtualTableManager = ShardingContainer.GetService(); } - public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() + public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async) { var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake(); var skip = StreamMergeContext.Skip.GetValueOrDefault(); @@ -58,23 +51,11 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines var sequenceResults = new SequencePaginationList(sortRouteResults.Select(o=>o.RouteQueryResult)).Skip(skip).Take(take).ToList(); - StreamMergeContext.ReSetOrders(new PropertyOrder[] { new PropertyOrder(_appendPaginationConfig.PropertyName, true) }); + StreamMergeContext.ReSetOrders(new [] { new PropertyOrder(_appendPaginationConfig.PropertyName, true) }); var enumeratorTasks = sequenceResults.Select(sequenceResult => { var newQueryable = CreateAsyncExecuteQueryable(noPaginationQueryable, sequenceResult); - return Task.Run(async () => - { - try - { - var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable); - return new StreamMergeAsyncEnumerator(asyncEnumerator); - } - catch (Exception e) - { - Console.WriteLine(e); - throw; - } - }); + return AsyncQueryEnumerator(newQueryable,async); }).ToArray(); var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs similarity index 80% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs rename to src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs index 7696322a..ab1946d9 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs @@ -1,7 +1,5 @@ using System; -using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; using ShardingCore.Extensions; @@ -9,7 +7,7 @@ using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; -namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync { /* * @Author: xjm @@ -24,25 +22,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines { } - public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() + public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async) { var tableResult = StreamMergeContext.RouteResults; var enumeratorTasks = tableResult.Select(routeResult => { var newQueryable = CreateAsyncExecuteQueryable(routeResult); - return Task.Run(async () => - { - try - { - var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable); - return new StreamMergeAsyncEnumerator(asyncEnumerator); - } - catch (Exception e) - { - Console.WriteLine(e); - throw; - } - }); + return AsyncQueryEnumerator(newQueryable, async); }).ToArray(); var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs similarity index 84% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs rename to src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs index bb5ae50f..89fcf602 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; using ShardingCore.Core.Internal.Visitors; using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; @@ -10,7 +9,7 @@ using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; -namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync { /* * @Author: xjm @@ -30,7 +29,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines _total = total; } - public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() + public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async) { var noPaginationNoOrderQueryable = _primaryOrder.IsAsc ? StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveOrderBy(): StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveOrderByDescending(); @@ -38,26 +37,17 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines var take = StreamMergeContext.Take.GetValueOrDefault(); var realSkip = _total- take- skip; var tableResult = StreamMergeContext.RouteResults; - var reverseOrderQueryable = noPaginationNoOrderQueryable.Skip((int)realSkip).Take((int)realSkip+take).OrderWithExpression(new List() + StreamMergeContext.ReSetSkip((int)realSkip); + var propertyOrders = new List() { new PropertyOrder( _primaryOrder.PropertyExpression,!_primaryOrder.IsAsc) - }); + }; + StreamMergeContext.ReSetOrders(propertyOrders); + var reverseOrderQueryable = noPaginationNoOrderQueryable.Take((int)realSkip+take).OrderWithExpression(propertyOrders); var enumeratorTasks = tableResult.Select(routeResult => { var newQueryable = CreateAsyncExecuteQueryable(reverseOrderQueryable,routeResult); - return Task.Run(async () => - { - try - { - var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable); - return new StreamMergeAsyncEnumerator(asyncEnumerator); - } - catch (Exception e) - { - Console.WriteLine(e); - throw; - } - }); + return AsyncQueryEnumerator(newQueryable,async); }).ToArray(); var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SequenceEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs similarity index 83% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SequenceEnumeratorAsyncStreamMergeEngine.cs rename to src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs index cc3d3f60..918169be 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SequenceEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs @@ -1,12 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; -using ShardingCore.Core.Internal.Visitors; -using ShardingCore.Core.ShardingPage.Abstractions; -using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; -using ShardingCore.Core.VirtualTables; using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Extensions.InternalExtensions; @@ -16,7 +11,7 @@ using ShardingCore.Sharding.PaginationConfigurations; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base; -namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync { /* * @Author: xjm @@ -37,7 +32,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines _isAsc = isAsc; } - public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() + public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async) { var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake(); var skip = StreamMergeContext.Skip.GetValueOrDefault(); @@ -60,19 +55,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines var enumeratorTasks = sequenceResults.Select(sequenceResult => { var newQueryable = CreateAsyncExecuteQueryable(noPaginationQueryable, sequenceResult); - return Task.Run(async () => - { - try - { - var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable); - return new StreamMergeAsyncEnumerator(asyncEnumerator); - } - catch (Exception e) - { - Console.WriteLine(e); - throw; - } - }); + return AsyncQueryEnumerator(newQueryable, async); }).ToArray(); var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SingleQueryEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs similarity index 62% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SingleQueryEnumeratorAsyncStreamMergeEngine.cs rename to src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs index 7a5b3aa8..5b414000 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SingleQueryEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs @@ -1,13 +1,11 @@ -using System; using System.Linq; -using System.Threading.Tasks; -using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; +using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; -namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync { /* * @Author: xjm @@ -21,21 +19,30 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines { } - public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() + public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async) { var routeResult = StreamMergeContext.RouteResults.First(); var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); DbContextQueryStore.TryAdd(routeResult, shardingDbContext); var newQueryable = (IQueryable) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext); - - var asyncEnumerator = DoGetAsyncEnumerator(newQueryable).WaitAndUnwrapException(); - return new[] {new StreamMergeAsyncEnumerator(asyncEnumerator)}; + if (async) + { + var asyncEnumerator = DoGetAsyncEnumerator(newQueryable).WaitAndUnwrapException(); + return new[] { new StreamMergeAsyncEnumerator(asyncEnumerator) }; + } + else + { + var enumerator = DoGetEnumerator(newQueryable); + return new[] { new StreamMergeAsyncEnumerator(enumerator) }; + } } public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) { - return new MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + if (streamsAsyncEnumerators.Length != 1) + throw new ShardingCoreException($"{nameof(SingleQueryEnumeratorAsyncStreamMergeEngine)} has more {nameof(IStreamMergeAsyncEnumerator)}"); + return streamsAsyncEnumerators[0]; } } } \ No newline at end of file