prefect combine async sync

This commit is contained in:
xuejiaming 2021-09-03 21:23:26 +08:00
parent fbc2eb492b
commit 7f064523d8
31 changed files with 446 additions and 835 deletions

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
@ -58,15 +59,15 @@ namespace Sample.SqlServer.Controllers
//var sresultx11231 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age == 198198).Select(o => o.Id).Contains("1981");
//var sresultx1121 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Sum(o => o.Age);
//var sresultx111 = _defaultTableDbContext.Set<SysUserMod>().FirstOrDefault(o => o.Id == "198");
//var sresultx2 = _defaultTableDbContext.Set<SysUserMod>().Count(o => o.Age <= 10);
//var sresultx = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").FirstOrDefault();
//var sresultx33 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o => o.Id).FirstOrDefault();
//var sresultxc = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o => o.Id).ToList();
//var sresultxasdc = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").ToList();
//var sresult = _defaultTableDbContext.Set<SysUserMod>().ToList();
var sresultx11231 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age == 198198).Select(o => o.Id).Contains("1981");
var sresultx1121 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Sum(o => o.Age);
var sresultx111 = _defaultTableDbContext.Set<SysUserMod>().FirstOrDefault(o => o.Id == "198");
var sresultx2 = _defaultTableDbContext.Set<SysUserMod>().Count(o => o.Age <= 10);
var sresultx = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").FirstOrDefault();
var sresultx33 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o => o.Id).FirstOrDefault();
var sresultxc = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o => o.Id).ToList();
var sresultxasdc = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").ToList();
var sresult = _defaultTableDbContext.Set<SysUserMod>().ToList();
var sysUserMod98 = result.FirstOrDefault(o => o.Id == "98");
_defaultTableDbContext.Attach(sysUserMod98);
@ -89,8 +90,28 @@ namespace Sample.SqlServer.Controllers
[HttpGet]
public async Task<IActionResult> Get1([FromQuery] int p,[FromQuery]int s)
{
var shardingPageResultAsync = await _defaultTableDbContext.Set<SysUserSalary>().OrderByDescending(o=>o.DateOfMonth).ToShardingPageAsync(p, s);
return Ok(shardingPageResultAsync);
Stopwatch sp = new Stopwatch();
sp.Start();
var shardingPageResultAsync = await _defaultTableDbContext.Set<SysUserMod>().OrderBy(o=>o.Age).ToShardingPageAsync(p, s);
sp.Stop();
return Ok(new
{
sp.ElapsedMilliseconds,
shardingPageResultAsync
});
}
[HttpGet]
public IActionResult Get2([FromQuery] int p,[FromQuery]int s)
{
Stopwatch sp = new Stopwatch();
sp.Start();
var shardingPageResultAsync = _defaultTableDbContext.Set<SysUserMod>().OrderBy(o=>o.Age).ToShardingPage(p, s);
sp.Stop();
return Ok(new
{
sp.ElapsedMilliseconds,
shardingPageResultAsync
});
}
}
}

View File

@ -11,8 +11,9 @@ namespace Sample.SqlServer.Shardings
{
public void Configure(PaginationBuilder<SysUserMod> builder)
{
builder.PaginationSequence(o => o.Id)
.UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch);
//builder.PaginationSequence(o => o.Age)
// .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch);
builder.ConfigReverseShardingPage(reverseTotalGe:900);
}
}
}

View File

@ -11,10 +11,12 @@ namespace Sample.SqlServer.Shardings
{
public void Configure(PaginationBuilder<SysUserSalary> builder)
{
//builder.PaginationSequence(o => o.Id)
// .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch);
//builder.PaginationSequence(o => o.DateOfMonth)
// .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch).UseAppendIfOrderNone();
builder.PaginationSequence(o => o.Id)
.UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch);
builder.PaginationSequence(o => o.DateOfMonth)
.UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch).UseAppendIfOrderNone();
builder.PaginationSequence(o => o.Salary)
.UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch).UseAppendIfOrderNone();
builder.ConfigReverseShardingPage();
}
}

View File

@ -1,19 +0,0 @@
using System;
using System.Threading.Tasks;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeSync;
namespace ShardingCore.Sharding.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 14 August 2021 22:05:40
* @Email: 326308290@qq.com
*/
public interface IStreamMergeEngine<T>
{
Task<IStreamMergeAsyncEnumerator<T>> GetAsyncEnumerator();
IStreamMergeEnumerator<T> GetEnumerator();
}
}

View File

@ -8,11 +8,12 @@ namespace ShardingCore.Sharding.Enumerators
* @Date: Saturday, 14 August 2021 21:21:44
* @Email: 326308290@qq.com
*/
public interface IStreamMergeAsyncEnumerator<T>:IAsyncEnumerator<T>
public interface IStreamMergeAsyncEnumerator<T>:IAsyncEnumerator<T>,IEnumerator<T>
{
bool SkipFirst();
bool HasElement();
T ReallyCurrent { get; }
T GetCurrent();
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@ -13,11 +14,11 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class InMemoryReverseStreamMergeAsyncEnumerator<T>:IStreamMergeAsyncEnumerator<T>
public class InMemoryReverseStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
{
private readonly IStreamMergeAsyncEnumerator<T> _inMemoryStreamMergeAsyncEnumerator;
private bool _first = true;
private IEnumerator<T> _reverseEnumerator = Enumerable.Empty<T>().GetEnumerator();
private IEnumerator<T> _reverseEnumerator;
public InMemoryReverseStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<T> inMemoryStreamMergeAsyncEnumerator)
{
_inMemoryStreamMergeAsyncEnumerator = inMemoryStreamMergeAsyncEnumerator;
@ -32,20 +33,44 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
{
if (_first)
{
ICollection<T> _reverseCollection = new LinkedList<T>();
while(await _inMemoryStreamMergeAsyncEnumerator.MoveNextAsync())
LinkedList<T> _reverseCollection = new LinkedList<T>();
while (await _inMemoryStreamMergeAsyncEnumerator.MoveNextAsync())
{
_reverseCollection.Add(_inMemoryStreamMergeAsyncEnumerator.Current);
_reverseCollection.AddFirst(_inMemoryStreamMergeAsyncEnumerator.GetCurrent());
}
_reverseEnumerator = _reverseCollection.Reverse().GetEnumerator();
_reverseEnumerator = _reverseCollection.GetEnumerator();
_first = false;
}
return _reverseEnumerator.MoveNext();
}
public T Current => _reverseEnumerator.Current;
public bool MoveNext()
{
if (_first)
{
LinkedList<T> _reverseCollection = new LinkedList<T>();
while ( _inMemoryStreamMergeAsyncEnumerator.MoveNext())
{
_reverseCollection.AddFirst(_inMemoryStreamMergeAsyncEnumerator.GetCurrent());
}
_reverseEnumerator = _reverseCollection.GetEnumerator();
_first = false;
}
return _reverseEnumerator.MoveNext();
}
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => GetCurrent();
public bool SkipFirst()
{
throw new NotImplementedException();
@ -57,5 +82,15 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
}
public T ReallyCurrent => Current;
public T GetCurrent()
{
return _reverseEnumerator == null ? default : _reverseEnumerator.Current;
}
public void Dispose()
{
_inMemoryStreamMergeAsyncEnumerator.Dispose();
_reverseEnumerator.Dispose();
}
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@ -9,15 +10,15 @@ using ShardingCore.Sharding.Enumerators.AggregateExtensions;
namespace ShardingCore.Sharding.Enumerators
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:43:26
* @Email: 326308290@qq.com
*/
public class MultiAggregateOrderStreamMergeAsyncEnumerator<T>:IStreamMergeAsyncEnumerator<T>
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:43:26
* @Email: 326308290@qq.com
*/
public class MultiAggregateOrderStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IEnumerable<IStreamMergeAsyncEnumerator<T>> _enumerators;
private readonly PriorityQueue<IOrderStreamMergeAsyncEnumerator<T>> _queue;
@ -65,10 +66,10 @@ namespace ShardingCore.Sharding.Enumerators
if (_queue.IsEmpty())
return false;
#if !EFCORE2
var hasNext = await SetCurrentValue();
var hasNext = await SetCurrentValueAsync();
#endif
#if EFCORE2
var hasNext = await SetCurrentValue(cancellationToken);
var hasNext = await SetCurrentValueAsync(cancellationToken);
#endif
if (hasNext)
{
@ -89,17 +90,17 @@ namespace ShardingCore.Sharding.Enumerators
return true;
}
#if !EFCORE2
private async ValueTask<bool> SetCurrentValue()
private async ValueTask<bool> SetCurrentValueAsync()
#endif
#if EFCORE2
private async Task<bool> SetCurrentValue(CancellationToken cancellationToken=new CancellationToken())
private async Task<bool> SetCurrentValueAsync(CancellationToken cancellationToken=new CancellationToken())
#endif
{
CurrentValue = default;
var currentValues = new List<T>();
while (EqualWithGroupValues())
{
var current=_queue.Peek().Current;
var current = _queue.Peek().GetCurrent();
currentValues.Add(current);
var first = _queue.Poll();
@ -123,10 +124,46 @@ namespace ShardingCore.Sharding.Enumerators
return true;
}
public bool MoveNext()
{
if (_queue.IsEmpty())
return false;
var hasNext = SetCurrentValue();
if (hasNext)
{
CurrentGroupValues = _queue.IsEmpty() ? new List<object>(0) : GetCurrentGroupValues(_queue.Peek());
}
return hasNext;
}
private bool SetCurrentValue()
{
CurrentValue = default;
var currentValues = new List<T>();
while (EqualWithGroupValues())
{
var current = _queue.Peek().GetCurrent();
currentValues.Add(current);
var first = _queue.Poll();
if (first.MoveNext())
{
_queue.Offer(first);
}
if (_queue.IsEmpty())
{
break;
}
}
MergeValue(currentValues);
return true;
}
private void MergeValue(List<T> aggregateValues)
{
if (aggregateValues.IsNotEmpty())
{
CurrentValue = aggregateValues.First();
@ -141,16 +178,20 @@ namespace ShardingCore.Sharding.Enumerators
if (aggregate.AggregateMethod == nameof(Queryable.Count))
{
aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName);
} else if (aggregate.AggregateMethod == nameof(Queryable.Sum))
}
else if (aggregate.AggregateMethod == nameof(Queryable.Sum))
{
aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName);
} else if (aggregate.AggregateMethod == nameof(Queryable.Max))
}
else if (aggregate.AggregateMethod == nameof(Queryable.Max))
{
aggregateValue = aggregateValues.AsQueryable().Max(aggregate.PropertyName);
}else if (aggregate.AggregateMethod == nameof(Queryable.Min))
}
else if (aggregate.AggregateMethod == nameof(Queryable.Min))
{
aggregateValue = aggregateValues.AsQueryable().Min(aggregate.PropertyName);
}else if (aggregate.AggregateMethod == nameof(Queryable.Average))
}
else if (aggregate.AggregateMethod == nameof(Queryable.Average))
{
aggregateValue = aggregateValues.AsQueryable().Average(aggregate.PropertyName);
}
@ -158,7 +199,7 @@ namespace ShardingCore.Sharding.Enumerators
{
throw new InvalidOperationException($"method:{aggregate.AggregateMethod} invalid operation ");
}
CurrentValue.SetPropertyValue(aggregate.PropertyName,aggregateValue);
CurrentValue.SetPropertyValue(aggregate.PropertyName, aggregateValue);
}
}
}
@ -176,7 +217,11 @@ namespace ShardingCore.Sharding.Enumerators
return ReallyCurrent != null;
}
public T ReallyCurrent => _queue.IsEmpty()?default(T):_queue.Peek().ReallyCurrent;
public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent;
public T GetCurrent()
{
return CurrentValue;
}
#if !EFCORE2
@ -198,6 +243,21 @@ namespace ShardingCore.Sharding.Enumerators
}
#endif
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => CurrentValue;
public void Dispose()
{
foreach (var enumerator in _enumerators)
{
enumerator.Dispose();
}
}
}
}

View File

@ -1,3 +1,4 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@ -80,6 +81,30 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
_currentEnumerator = _queue.Peek();
return true;
}
public bool MoveNext()
{
if (_queue.IsEmpty())
return false;
if (skipFirst)
{
skipFirst = false;
return true;
}
var first = _queue.Poll();
if (first.MoveNext())
{
_queue.Offer(first);
}
if (_queue.IsEmpty())
{
return false;
}
_currentEnumerator = _queue.Peek();
return true;
}
public bool SkipFirst()
@ -98,6 +123,10 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
}
public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent;
public T GetCurrent()
{
return _currentEnumerator.GetCurrent();
}
#if !EFCORE2
@ -121,6 +150,18 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
#endif
public T Current => skipFirst ? default : _currentEnumerator.Current;
public void Reset()
{
throw new System.NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => skipFirst ? default : _currentEnumerator.GetCurrent();
public void Dispose()
{
_currentEnumerator.Dispose();
}
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@ -55,7 +56,26 @@ namespace ShardingCore.Sharding.Enumerators
}
public T Current => _enumerator.Current;
public bool MoveNext()
{
var has = _enumerator.MoveNext();
SetOrderValues();
return has;
}
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => GetCurrent();
public void Dispose()
{
_enumerator.Dispose();
}
public bool SkipFirst()
{
@ -68,6 +88,10 @@ namespace ShardingCore.Sharding.Enumerators
}
public T ReallyCurrent => _enumerator.ReallyCurrent;
public T GetCurrent()
{
return _enumerator.GetCurrent();
}
private List<IComparable> GetCurrentOrderValues()
{

View File

@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@ -72,7 +73,40 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
return next;
}
public T Current => _enumerator.Current;
public bool MoveNext()
{
//如果合并数据的时候不需要跳过也没有take多少那么就是直接next
while (_skip.GetValueOrDefault() > this.realSkip)
{
var has = _enumerator.MoveNext();
realSkip++;
if (!has)
return false;
}
var next = _enumerator.MoveNext();
if (next)
{
if (_take.HasValue)
{
realTake++;
if (realTake > _take.Value)
return false;
}
}
return next;
}
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => _enumerator.GetCurrent();
public bool SkipFirst()
{
return _enumerator.SkipFirst();
@ -84,6 +118,14 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
}
public T ReallyCurrent => _enumerator.ReallyCurrent;
public T GetCurrent()
{
return _enumerator.GetCurrent();
}
public void Dispose()
{
_enumerator.Dispose();
}
#if !EFCORE2
public ValueTask DisposeAsync()

View File

@ -6,20 +6,31 @@ using System.Threading.Tasks;
namespace ShardingCore.Sharding.Enumerators
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 14 August 2021 21:25:50
* @Email: 326308290@qq.com
*/
public class StreamMergeAsyncEnumerator<T>:IStreamMergeAsyncEnumerator<T>
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 14 August 2021 21:25:50
* @Email: 326308290@qq.com
*/
public class StreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
{
private readonly IAsyncEnumerator<T> _source;
private readonly IAsyncEnumerator<T> _asyncSource;
private readonly IEnumerator<T> _syncSource;
private bool skip;
public StreamMergeAsyncEnumerator(IAsyncEnumerator<T> source)
public StreamMergeAsyncEnumerator(IAsyncEnumerator<T> asyncSource)
{
_source = source;
if (_syncSource != null)
throw new ArgumentNullException(nameof(_syncSource));
_asyncSource = asyncSource;
skip = true;
}
public StreamMergeAsyncEnumerator(IEnumerator<T> syncSource)
{
if (_asyncSource != null)
throw new ArgumentNullException(nameof(_asyncSource));
_syncSource = syncSource;
skip = true;
}
@ -33,9 +44,10 @@ namespace ShardingCore.Sharding.Enumerators
return false;
}
#if !EFCORE2
public ValueTask DisposeAsync()
public async ValueTask DisposeAsync()
{
return _source.DisposeAsync();
if (_asyncSource != null)
await _asyncSource.DisposeAsync();
}
public async ValueTask<bool> MoveNextAsync()
@ -43,22 +55,61 @@ namespace ShardingCore.Sharding.Enumerators
if (skip)
{
skip = false;
return null!=_source.Current;
return null != _asyncSource.Current;
}
return await _source.MoveNextAsync();
return await _asyncSource.MoveNextAsync();
}
public T Current => skip?default:_source.Current;
public T ReallyCurrent => _source.Current;
public bool MoveNext()
{
if (skip)
{
skip = false;
return null != _syncSource.Current;
}
return _syncSource.MoveNext();
}
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => GetCurrent();
public T ReallyCurrent => GetReallyCurrent();
public bool HasElement()
{
return null != _source.Current;
if (_asyncSource != null) return null != _asyncSource.Current;
if (_syncSource != null) return null != _syncSource.Current;
return false;
}
public void Dispose()
{
_syncSource?.Dispose();
}
public T GetCurrent()
{
if (skip)
return default;
if (_asyncSource != null) return _asyncSource.Current;
if (_syncSource != null) return _syncSource.Current;
return default;
}
public T GetReallyCurrent()
{
if (_asyncSource != null) return _asyncSource.Current;
if (_syncSource != null) return _syncSource.Current;
return default;
}
#endif
#if EFCORE2
public void Dispose()
{
_source.Dispose();
_asyncSource.Dispose();
}
public async Task<bool> MoveNext(CancellationToken cancellationToken=new CancellationToken())
@ -68,7 +119,7 @@ namespace ShardingCore.Sharding.Enumerators
skip = false;
return null != SourceCurrent();
}
return await _source.MoveNext(cancellationToken);
return await _asyncSource.MoveNext(cancellationToken);
}
public T Current => skip ? default : SourceCurrent();
public T ReallyCurrent => SourceCurrent();
@ -83,7 +134,7 @@ namespace ShardingCore.Sharding.Enumerators
{
if (tryGetCurrentError)
return default;
return _source.Current;
return _asyncSource.Current;
}catch(Exception e)
{
tryGetCurrentError = true;

View File

@ -1,16 +0,0 @@
using System;
using System.Collections.Generic;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:44:33
* @Email: 326308290@qq.com
*/
public interface IOrderStreamMergeEnumerator<T>:IStreamMergeEnumerator<T>, IComparable<IOrderStreamMergeEnumerator<T>>
{
List<IComparable> GetCompares();
}
}

View File

@ -1,17 +0,0 @@
using System.Collections.Generic;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 14 August 2021 22:06:38
* @Email: 326308290@qq.com
*/
public interface IStreamMergeEnumerator<T>:IEnumerator<T>
{
bool SkipFirst();
bool HasElement();
T ReallyCurrent { get; }
}
}

View File

@ -1,183 +0,0 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.PriorityQueues;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators.AggregateExtensions;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:43:26
* @Email: 326308290@qq.com
*/
public class MultiAggregateOrderStreamMergeEnumerator<T> : IStreamMergeEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IEnumerable<IStreamMergeEnumerator<T>> _enumerators;
private readonly PriorityQueue<IOrderStreamMergeEnumerator<T>> _queue;
private T CurrentValue;
private List<object> CurrentGroupValues;
private bool _skipFirst;
public MultiAggregateOrderStreamMergeEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeEnumerator<T>> enumerators)
{
_mergeContext = mergeContext;
_enumerators = enumerators;
_queue = new PriorityQueue<IOrderStreamMergeEnumerator<T>>(enumerators.Count());
_skipFirst = true;
SetOrderEnumerator();
}
private void SetOrderEnumerator()
{
foreach (var source in _enumerators)
{
var orderStreamEnumerator = new OrderStreamMergeEnumerator<T>(_mergeContext, source);
if (orderStreamEnumerator.HasElement())
{
orderStreamEnumerator.SkipFirst();
_queue.Offer(orderStreamEnumerator);
}
}
//设置第一个元素聚合的属性值
CurrentGroupValues = _queue.IsEmpty() ? new List<object>(0) : GetCurrentGroupValues(_queue.Peek());
}
private List<object> GetCurrentGroupValues(IOrderStreamMergeEnumerator<T> enumerator)
{
var first = enumerator.ReallyCurrent;
return _mergeContext.SelectContext.SelectProperties.Where(o => !o.IsAggregateMethod)
.Select(o => first.GetValueByExpression(o.PropertyName)).ToList();
}
public bool MoveNext()
{
if (_queue.IsEmpty())
return false;
var hasNext = SetCurrentValue();
if (hasNext)
{
CurrentGroupValues = _queue.IsEmpty() ? new List<object>(0) : GetCurrentGroupValues(_queue.Peek());
}
return hasNext;
}
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
private bool EqualWithGroupValues()
{
var current = GetCurrentGroupValues(_queue.Peek());
for (int i = 0; i < CurrentGroupValues.Count; i++)
{
if (!CurrentGroupValues[i].Equals(current[i]))
return false;
}
return true;
}
private bool SetCurrentValue()
{
CurrentValue = default;
var currentValues = new List<T>();
while (EqualWithGroupValues())
{
var current = _queue.Peek().Current;
currentValues.Add(current);
var first = _queue.Poll();
if (first.MoveNext())
{
_queue.Offer(first);
}
if (_queue.IsEmpty())
{
break;
}
}
MergeValue(currentValues);
return true;
}
private void MergeValue(List<T> aggregateValues)
{
if (aggregateValues.IsNotEmpty())
{
CurrentValue = aggregateValues.First();
if (aggregateValues.Count > 1)
{
var aggregates = _mergeContext.SelectContext.SelectProperties.Where(o => o.IsAggregateMethod).ToList();
if (aggregates.IsNotEmpty())
{
foreach (var aggregate in aggregates)
{
object aggregateValue = null;
if (aggregate.AggregateMethod == nameof(Queryable.Count))
{
aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName);
}
else if (aggregate.AggregateMethod == nameof(Queryable.Sum))
{
aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName);
}
else if (aggregate.AggregateMethod == nameof(Queryable.Max))
{
aggregateValue = aggregateValues.AsQueryable().Max(aggregate.PropertyName);
}
else if (aggregate.AggregateMethod == nameof(Queryable.Min))
{
aggregateValue = aggregateValues.AsQueryable().Min(aggregate.PropertyName);
}
else if (aggregate.AggregateMethod == nameof(Queryable.Average))
{
aggregateValue = aggregateValues.AsQueryable().Average(aggregate.PropertyName);
}
else
{
throw new InvalidOperationException($"method:{aggregate.AggregateMethod} invalid operation ");
}
CurrentValue.SetPropertyValue(aggregate.PropertyName, aggregateValue);
}
}
}
}
}
public bool SkipFirst()
{
return true;
}
public bool HasElement()
{
return ReallyCurrent != null;
}
public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent;
public void Dispose()
{
foreach (var enumerator in _enumerators)
{
enumerator?.Dispose();
}
}
public T Current => CurrentValue;
}
}

View File

@ -1,110 +0,0 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.PriorityQueues;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:49:09
* @Email: 326308290@qq.com
*/
public class MultiOrderStreamMergeEnumerator<T> : IStreamMergeEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IEnumerable<IStreamMergeEnumerator<T>> _enumerators;
private readonly PriorityQueue<IOrderStreamMergeEnumerator<T>> _queue;
private IStreamMergeEnumerator<T> _currentEnumerator;
private bool skipFirst;
public MultiOrderStreamMergeEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeEnumerator<T>> enumerators)
{
_mergeContext = mergeContext;
_enumerators = enumerators;
_queue = new PriorityQueue<IOrderStreamMergeEnumerator<T>>(enumerators.Count());
skipFirst = true;
SetOrderEnumerator();
}
private void SetOrderEnumerator()
{
foreach (var source in _enumerators)
{
var orderStreamEnumerator = new OrderStreamMergeEnumerator<T>(_mergeContext, source);
if (orderStreamEnumerator.HasElement())
{
orderStreamEnumerator.SkipFirst();
_queue.Offer(orderStreamEnumerator);
}
}
_currentEnumerator = _queue.IsEmpty() ? _enumerators.FirstOrDefault() : _queue.Peek();
}
public bool MoveNext()
{
if (_queue.IsEmpty())
return false;
if (skipFirst)
{
skipFirst = false;
return true;
}
var first = _queue.Poll();
if (first.MoveNext())
{
_queue.Offer(first);
}
if (_queue.IsEmpty())
{
return false;
}
_currentEnumerator = _queue.Peek();
return true;
}
public void Reset()
{
throw new System.NotImplementedException();
}
object IEnumerator.Current => Current;
public bool SkipFirst()
{
if (skipFirst)
{
skipFirst = false;
return true;
}
return false;
}
public bool HasElement()
{
return _currentEnumerator != null && _currentEnumerator.HasElement();
}
public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent;
public void Dispose()
{
foreach (var enumerator in _enumerators)
{
enumerator?.Dispose();
}
}
public T Current => skipFirst ? default : _currentEnumerator.Current;
}
}

View File

@ -1,110 +0,0 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge;
using ShardingCore.Extensions;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:46:32
* @Email: 326308290@qq.com
*/
public class OrderStreamMergeEnumerator<T>:IOrderStreamMergeEnumerator<T>
{
/// <summary>
/// 合并数据上下文
/// </summary>
private readonly StreamMergeContext<T> _mergeContext;
private readonly IStreamMergeEnumerator<T> _enumerator;
private List<IComparable> _orderValues;
public OrderStreamMergeEnumerator(StreamMergeContext<T> mergeContext, IStreamMergeEnumerator<T> enumerator)
{
_mergeContext = mergeContext;
_enumerator = enumerator;
SetOrderValues();
}
private void SetOrderValues()
{
_orderValues = HasElement() ? GetCurrentOrderValues() : new List<IComparable>(0);
}
public bool MoveNext()
{
var has = _enumerator.MoveNext();
SetOrderValues();
return has;
}
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current =>_enumerator.Current;
public bool SkipFirst()
{
return _enumerator.SkipFirst();
}
public bool HasElement()
{
return _enumerator.HasElement();
}
public T ReallyCurrent => _enumerator.ReallyCurrent;
private List<IComparable> GetCurrentOrderValues()
{
if (!_mergeContext.Orders.Any())
return new List<IComparable>(0);
var list = new List<IComparable>(_mergeContext.Orders.Count());
foreach (var order in _mergeContext.Orders)
{
var value = _enumerator.ReallyCurrent.GetValueByExpression(order.PropertyExpression);
if (value is IComparable comparable)
list.Add(comparable);
else
throw new NotSupportedException($"order by value [{order}] must implements IComparable");
}
return list;
}
public int CompareTo(IOrderStreamMergeEnumerator<T> other)
{
int i = 0;
foreach (var order in _mergeContext.Orders) {
int result = CompareHelper.CompareToWith(_orderValues[i], other.GetCompares()[i], order.IsAsc);
if (0 != result) {
return result;
}
i++;
}
return 0;
}
public List<IComparable> GetCompares()
{
return _orderValues ?? new List<IComparable>(0);
}
public void Dispose()
{
_enumerator?.Dispose();
}
}
}

View File

@ -1,85 +0,0 @@
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:39:52
* @Email: 326308290@qq.com
*/
public class PaginationStreamMergeEnumerator<T> : IStreamMergeEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IStreamMergeEnumerator<T> _enumerator;
private readonly int? _skip;
private readonly int? _take;
private int realSkip = 0;
private int realTake = 0;
public PaginationStreamMergeEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_skip = mergeContext.Skip;
_take = mergeContext.Take;
if (_mergeContext.HasGroupQuery())
_enumerator = new MultiAggregateOrderStreamMergeEnumerator<T>(_mergeContext, sources);
else
_enumerator = new MultiOrderStreamMergeEnumerator<T>(_mergeContext, sources);
}
public bool MoveNext()
{
//如果合并数据的时候不需要跳过也没有take多少那么就是直接next
while (_skip.GetValueOrDefault() > this.realSkip)
{
var has = _enumerator.MoveNext();
realSkip++;
if (!has)
return false;
}
var next = _enumerator.MoveNext();
if (next)
{
if (_take.HasValue)
{
realTake++;
if (realTake > _take.Value)
return false;
}
}
return next;
}
public void Reset()
{
throw new System.NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => _enumerator.Current;
public bool SkipFirst()
{
return _enumerator.SkipFirst();
}
public bool HasElement()
{
return _enumerator.HasElement();
}
public T ReallyCurrent => _enumerator.ReallyCurrent;
public void Dispose()
{
_enumerator?.Dispose();
}
}
}

View File

@ -1,63 +0,0 @@
using System;
using System.Collections;
using System.Collections.Generic;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 14 August 2021 21:25:50
* @Email: 326308290@qq.com
*/
public class StreamMergeEnumerator<T>:IStreamMergeEnumerator<T>
{
private readonly IEnumerator<T> _source;
private bool skip;
public StreamMergeEnumerator(IEnumerator<T> source)
{
_source = source;
skip = true;
}
public bool MoveNext()
{
if (skip)
{
skip = false;
return null != _source.Current;
}
return _source.MoveNext();
}
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => skip?default:_source.Current;
public bool SkipFirst()
{
if (skip)
{
skip = false;
return true;
}
return false;
}
public bool HasElement()
{
return null != _source.Current;
}
public T ReallyCurrent => _source.Current;
public void Dispose()
{
_source?.Dispose();
}
}
}

View File

@ -31,7 +31,7 @@ namespace ShardingCore.Sharding.PaginationConfigurations
/// <summary>
/// 是否已开启反向排序 仅支持单排序
/// </summary>
public bool EnableReverseShardingPage => ReverseFactor > 0 && ReverseFactor < 1 && ReverseTotalGe >= 1000;
public bool EnableReverseShardingPage => ReverseFactor > 0 && ReverseFactor < 1 && ReverseTotalGe >= 500;
/// <summary>
/// 分表发现如果少于多少条后直接取到内存 LESS THAN OR EQUAL
/// </summary>

View File

@ -32,12 +32,12 @@ namespace ShardingCore.Core.Internal.StreamMerge.ReWrite
var reWriteQueryable = _queryable;
if (take.HasValue)
{
reWriteQueryable = _queryable.RemoveTake();
reWriteQueryable = reWriteQueryable.RemoveTake();
}
if (skip.HasValue)
{
reWriteQueryable = _queryable.RemoveSkip();
reWriteQueryable = reWriteQueryable.RemoveSkip();
}
if (take.HasValue)

View File

@ -13,6 +13,7 @@ using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync;
namespace ShardingCore.Sharding.ShardingQueryExecutors
{

View File

@ -29,7 +29,7 @@ namespace ShardingCore.Sharding
private readonly IQueryable<T> _reWriteSource;
//public IEnumerable<RouteResult> RouteResults { get; }
//public DataSourceRoutingResult RoutingResult { get; }
public int? Skip { get;}
public int? Skip { get; private set; }
public int? Take { get; }
public IEnumerable<PropertyOrder> Orders { get; private set; }
@ -72,6 +72,11 @@ namespace ShardingCore.Sharding
{
Orders = orders;
}
public void ReSetSkip(int? skip)
{
Skip = skip;
}
public DbContext CreateDbContext(RouteResult routeResult)
{
var routeTail = _routeTailFactory.Create(routeResult);

View File

@ -1,17 +1,7 @@
using System;
using ShardingCore.Sharding.ShardingQueryExecutors;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.Enumerators.StreamMergeSync;
using ShardingCore.Sharding.ShardingQueryExecutors;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
@ -36,13 +26,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
#if !EFCORE2
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable)
{
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
}
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
return new EnumeratorShardingQueryExecutor<T>(_mergeContext).ExecuteAsync(cancellationToken)
@ -64,14 +47,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
#endif
private IEnumerator<T> GetEnumerator(IQueryable<T> newQueryable)
{
var enumator = newQueryable.AsEnumerable().GetEnumerator();
enumator.MoveNext();
return enumator;
}
public IEnumerator<T> GetEnumerator()
{

View File

@ -20,28 +20,53 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractEnumeratorAsyncStreamMergeEngine<TEntity>: AbstractEnumeratorStreamMergeEngine<TEntity>
public abstract class AbstractEnumeratorAsyncStreamMergeEngine<TEntity> : AbstractEnumeratorStreamMergeEngine<TEntity>
{
public AbstractEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
}
public override IAsyncEnumerator<TEntity> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
var dbStreamMergeAsyncEnumerators = GetDbStreamMergeAsyncEnumerators();
if (dbStreamMergeAsyncEnumerators.IsEmpty())
throw new ShardingCoreException("GetDbStreamMergeAsyncEnumerators empty");
return GetStreamMergeAsyncEnumerator(dbStreamMergeAsyncEnumerators);
}
public abstract IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators();
public abstract IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async);
public abstract IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators);
public Task<StreamMergeAsyncEnumerator<TEntity>> AsyncQueryEnumerator(IQueryable<TEntity> queryable,bool async)
{
return Task.Run(async () =>
{
try
{
if (async)
{
var asyncEnumerator = await DoGetAsyncEnumerator(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
else
{
var enumerator = DoGetEnumerator(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}
public async Task<IAsyncEnumerator<TEntity>> DoGetAsyncEnumerator(IQueryable<TEntity> newQueryable)
{
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
}
public IEnumerator<TEntity> DoGetEnumerator(IQueryable<TEntity> newQueryable)
{
var enumator = newQueryable.AsEnumerable().GetEnumerator();
enumator.MoveNext();
return enumator;
}
// public virtual IQueryable<TEntity> CreateAsyncExecuteQueryable(RouteResult routeResult)
// {
// var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
@ -51,10 +76,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
// .ReplaceDbContextQueryable(shardingDbContext);
// return newQueryable;
// }
public override IEnumerator<TEntity> GetEnumerator()
public override IStreamMergeAsyncEnumerator<TEntity> GetShardingAsyncEnumerator(bool async,
CancellationToken cancellationToken = new CancellationToken())
{
throw new NotImplementedException();
var dbStreamMergeAsyncEnumerators = GetDbStreamMergeAsyncEnumerators(async);
if (dbStreamMergeAsyncEnumerators.IsEmpty())
throw new ShardingCoreException("GetDbStreamMergeAsyncEnumerators empty");
return GetStreamMergeAsyncEnumerator(dbStreamMergeAsyncEnumerators);
}
}
}

View File

@ -7,6 +7,7 @@ using System.Threading;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions
{
@ -28,10 +29,19 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
DbContextQueryStore = new ConcurrentDictionary<RouteResult, DbContext>();
}
public abstract IAsyncEnumerator<TEntity> GetAsyncEnumerator(
public abstract IStreamMergeAsyncEnumerator<TEntity> GetShardingAsyncEnumerator(bool async,
CancellationToken cancellationToken = new CancellationToken());
public abstract IEnumerator<TEntity> GetEnumerator();
public IAsyncEnumerator<TEntity> GetAsyncEnumerator(
CancellationToken cancellationToken = new CancellationToken())
{
return GetShardingAsyncEnumerator(true,cancellationToken);
}
public IEnumerator<TEntity> GetEnumerator()
{
return GetShardingAsyncEnumerator(false);
}
IEnumerator IEnumerable.GetEnumerator()
{

View File

@ -1,31 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 15:38:13
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class AbstractEnumeratorSyncStreamMergeEngine<TEntity> : AbstractEnumeratorStreamMergeEngine<TEntity>
{
public AbstractEnumeratorSyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
}
public override IAsyncEnumerator<TEntity> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
throw new NotImplementedException();
}
public override IEnumerator<TEntity> GetEnumerator()
{
throw new NotImplementedException();
}
}
}

View File

@ -1,12 +1,9 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
@ -15,7 +12,7 @@ using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
/*
* @Author: xjm
@ -28,17 +25,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
{
private readonly PaginationConfig _appendPaginationConfig;
private readonly ICollection<RouteQueryResult<long>> _routeQueryResults;
private IShardingPageManager _shardingPageManager;
private IVirtualTableManager _virtualTableManager;
public AppenOrderSequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, PaginationConfig appendPaginationConfig, ICollection<RouteQueryResult<long>> routeQueryResults) : base(streamMergeContext)
{
_appendPaginationConfig = appendPaginationConfig;
_routeQueryResults = routeQueryResults;
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager>();
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
{
var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake();
var skip = StreamMergeContext.Skip.GetValueOrDefault();
@ -58,23 +51,11 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
var sequenceResults = new SequencePaginationList(sortRouteResults.Select(o=>o.RouteQueryResult)).Skip(skip).Take(take).ToList();
StreamMergeContext.ReSetOrders(new PropertyOrder[] { new PropertyOrder(_appendPaginationConfig.PropertyName, true) });
StreamMergeContext.ReSetOrders(new [] { new PropertyOrder(_appendPaginationConfig.PropertyName, true) });
var enumeratorTasks = sequenceResults.Select(sequenceResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(noPaginationQueryable, sequenceResult);
return Task.Run(async () =>
{
try
{
var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
return AsyncQueryEnumerator(newQueryable,async);
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();

View File

@ -1,7 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
@ -9,7 +7,7 @@ using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
/*
* @Author: xjm
@ -24,25 +22,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
{
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
{
var tableResult = StreamMergeContext.RouteResults;
var enumeratorTasks = tableResult.Select(routeResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(routeResult);
return Task.Run(async () =>
{
try
{
var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
return AsyncQueryEnumerator(newQueryable, async);
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();

View File

@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
@ -10,7 +9,7 @@ using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
/*
* @Author: xjm
@ -30,7 +29,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
_total = total;
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
{
var noPaginationNoOrderQueryable = _primaryOrder.IsAsc ? StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveOrderBy(): StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveOrderByDescending();
@ -38,26 +37,17 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
var take = StreamMergeContext.Take.GetValueOrDefault();
var realSkip = _total- take- skip;
var tableResult = StreamMergeContext.RouteResults;
var reverseOrderQueryable = noPaginationNoOrderQueryable.Skip((int)realSkip).Take((int)realSkip+take).OrderWithExpression(new List<PropertyOrder>()
StreamMergeContext.ReSetSkip((int)realSkip);
var propertyOrders = new List<PropertyOrder>()
{
new PropertyOrder( _primaryOrder.PropertyExpression,!_primaryOrder.IsAsc)
});
};
StreamMergeContext.ReSetOrders(propertyOrders);
var reverseOrderQueryable = noPaginationNoOrderQueryable.Take((int)realSkip+take).OrderWithExpression(propertyOrders);
var enumeratorTasks = tableResult.Select(routeResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(reverseOrderQueryable,routeResult);
return Task.Run(async () =>
{
try
{
var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
return AsyncQueryEnumerator(newQueryable,async);
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();

View File

@ -1,12 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
@ -16,7 +11,7 @@ using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
/*
* @Author: xjm
@ -37,7 +32,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
_isAsc = isAsc;
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
{
var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake();
var skip = StreamMergeContext.Skip.GetValueOrDefault();
@ -60,19 +55,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
var enumeratorTasks = sequenceResults.Select(sequenceResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(noPaginationQueryable, sequenceResult);
return Task.Run(async () =>
{
try
{
var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
return AsyncQueryEnumerator(newQueryable, async);
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();

View File

@ -1,13 +1,11 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
/*
* @Author: xjm
@ -21,21 +19,30 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
{
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
{
var routeResult = StreamMergeContext.RouteResults.First();
var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
DbContextQueryStore.TryAdd(routeResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext);
var asyncEnumerator = DoGetAsyncEnumerator(newQueryable).WaitAndUnwrapException();
return new[] {new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator)};
if (async)
{
var asyncEnumerator = DoGetAsyncEnumerator(newQueryable).WaitAndUnwrapException();
return new[] { new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator) };
}
else
{
var enumerator = DoGetEnumerator(newQueryable);
return new[] { new StreamMergeAsyncEnumerator<TEntity>(enumerator) };
}
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
if (streamsAsyncEnumerators.Length != 1)
throw new ShardingCoreException($"{nameof(SingleQueryEnumeratorAsyncStreamMergeEngine<TEntity>)} has more {nameof(IStreamMergeAsyncEnumerator<TEntity>)}");
return streamsAsyncEnumerators[0];
}
}
}