对ef5进行了代码优化暂不支持3和2后续会支持

This commit is contained in:
xuejmnet 2021-01-30 17:19:56 +08:00
parent 9ff7b12d33
commit 2cdccdcce7
25 changed files with 1107 additions and 391 deletions

View File

@ -16,7 +16,7 @@ namespace ShardingCore.Core.Internal.PriorityQueues
/// <param name="queue"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static T Peek<T>(this PriorityQueue<T> queue) where T: IComparable<T>
public static T Peek<T>(this PriorityQueue<T> queue)
{
if (queue.IsEmpty())
return default(T);
@ -28,7 +28,7 @@ namespace ShardingCore.Core.Internal.PriorityQueues
/// <param name="queue"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static T Poll<T>(this PriorityQueue<T> queue) where T: IComparable<T>
public static T Poll<T>(this PriorityQueue<T> queue)
{
if (queue.IsEmpty())
return default(T);
@ -44,7 +44,7 @@ namespace ShardingCore.Core.Internal.PriorityQueues
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static bool Offer<T>(this PriorityQueue<T> queue, T element) where T : IComparable<T>
public static bool Offer<T>(this PriorityQueue<T> queue, T element)
{
if (queue.IsFull())
return false;

View File

@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace ShardingCore.Core.Internal.RoutingRuleEngines
{
@ -11,5 +13,7 @@ namespace ShardingCore.Core.Internal.RoutingRuleEngines
public interface IRoutingRuleEngineFactory
{
IRouteRuleEngine CreateEngine();
RouteRuleContext<T> CreateContext<T>(IQueryable<T> queryable);
IEnumerable<RouteResult> Route<T>(IQueryable<T> queryable);
}
}

View File

@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Core.VirtualTables;
namespace ShardingCore.Core.Internal.RoutingRuleEngines
@ -12,15 +14,29 @@ namespace ShardingCore.Core.Internal.RoutingRuleEngines
public class RoutingRuleEngineFactory : IRoutingRuleEngineFactory
{
private readonly IRouteRuleEngine _routeRuleEngine;
private readonly IVirtualTableManager _virtualTableManager;
public RoutingRuleEngineFactory(IRouteRuleEngine routeRuleEngine)
public RoutingRuleEngineFactory(IRouteRuleEngine routeRuleEngine,IVirtualTableManager virtualTableManager)
{
_routeRuleEngine = routeRuleEngine;
_virtualTableManager = virtualTableManager;
}
public IRouteRuleEngine CreateEngine()
{
return _routeRuleEngine;
}
public RouteRuleContext<T> CreateContext<T>(IQueryable<T> queryable)
{
return new RouteRuleContext<T>(queryable, _virtualTableManager);
}
public IEnumerable<RouteResult> Route<T>(IQueryable<T> queryable)
{
var engine = CreateEngine();
var routeRuleContext = CreateContext<T>(queryable);
return engine.Route(routeRuleContext);
}
}
}

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
namespace ShardingCore.Core.Internal.StreamMerge.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 30 January 2021 15:27:48
* @Email: 326308290@qq.com
*/
public interface IOrderStreamMergeAsyncEnumerator<T>:IStreamMergeAsyncEnumerator<T>, IComparable<IOrderStreamMergeAsyncEnumerator<T>>
{
List<IComparable> GetCompares();
}
}

View File

@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
namespace ShardingCore.Core.Internal.StreamMerge.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 19:44:52
* @Email: 326308290@qq.com
*/
public interface IStreamMergeAsyncEnumerator<T>:IAsyncEnumerator<T>
{
bool SkipFirst();
bool HasElement();
T ReallyCurrent { get; }
}
}

View File

@ -0,0 +1,101 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.PriorityQueues;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
using ShardingCore.Helpers;
namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 20:01:29
* @Email: 326308290@qq.com
*/
internal class MultiOrderStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IEnumerable<IStreamMergeAsyncEnumerator<T>> _sources;
private readonly PriorityQueue<IOrderStreamMergeAsyncEnumerator<T>> _queue;
private IStreamMergeAsyncEnumerator<T> _currentEnumerator;
private bool skipFirst;
public MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_sources = sources;
_queue = new PriorityQueue<IOrderStreamMergeAsyncEnumerator<T>>(sources.Count(),true);
skipFirst = true;
SetOrderEnumerator();
}
private void SetOrderEnumerator()
{
foreach (var source in _sources)
{
var orderStreamEnumerator = new OrderStreamMergeAsyncEnumerator<T>(_mergeContext, source);
if (orderStreamEnumerator.HasElement())
{
orderStreamEnumerator.SkipFirst();
_queue.Offer(orderStreamEnumerator);
}
}
_currentEnumerator = _queue.IsEmpty() ? _sources.FirstOrDefault() : _queue.Peek();
}
public async ValueTask<bool> MoveNextAsync()
{
if (_queue.IsEmpty())
return false;
if (skipFirst)
{
skipFirst = false;
return true;
}
var first = _queue.Poll();
if (await first.MoveNextAsync())
{
_queue.Offer(first);
}
if (_queue.IsEmpty())
{
return false;
}
_currentEnumerator = _queue.Peek();
return true;
}
public bool SkipFirst()
{
if (skipFirst)
{
skipFirst = false;
return true;
}
return false;
}
public bool HasElement()
{
return _currentEnumerator != null && _currentEnumerator.HasElement();
}
public T ReallyCurrent => _queue.IsEmpty()?default(T):_queue.Peek().ReallyCurrent;
public async ValueTask DisposeAsync()
{
foreach (var source in _sources)
{
await source.DisposeAsync();
}
}
public T Current => skipFirst ? default : _currentEnumerator.Current;
}
}

View File

@ -0,0 +1,72 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
{
/*
* @Author: xjm
* @Description:
* @Date: Friday, 29 January 2021 20:55:42
* @Email: 326308290@qq.com
*/
internal class NoPaginationStreamMergeEngine<T>:IStreamMergeAsyncEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IStreamMergeAsyncEnumerator<T> _enumerator;
public NoPaginationStreamMergeEngine(StreamMergeContext<T> mergeContext,IEnumerable<IStreamMergeAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_enumerator = new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext,sources);;
}
public async ValueTask<bool> MoveNextAsync()
{
//如果合并数据的时候不需要跳过也没有take多少那么就是直接next
var skip = _mergeContext.Skip;
var take = _mergeContext.Take;
var realSkip = 0;
var realTake = 0;
var has = await _enumerator.MoveNextAsync();
if (has)
{
//获取真实的需要跳过的条数
if (skip.HasValue)
{
if (realSkip < skip)
{
realSkip++;
}
}
if (take.HasValue)
{
realTake++;
if (realTake <= take.Value)
return false;
}
}
return has;
}
public T Current => _enumerator.Current;
public bool SkipFirst()
{
return _enumerator.SkipFirst();
}
public bool HasElement()
{
return _enumerator.HasElement();
}
public T ReallyCurrent => _enumerator.ReallyCurrent;
public async ValueTask DisposeAsync()
{
await _enumerator.DisposeAsync();
}
}
}

View File

@ -0,0 +1,98 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
using ShardingCore.Extensions;
namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 19:48:16
* @Email: 326308290@qq.com
*/
internal class OrderStreamMergeAsyncEnumerator<T>:IOrderStreamMergeAsyncEnumerator<T>
{
/// <summary>
/// 合并数据上下文
/// </summary>
private readonly StreamMergeContext<T> _mergeContext;
private readonly IStreamMergeAsyncEnumerator<T> _source;
private List<IComparable> _orderValues;
public OrderStreamMergeAsyncEnumerator(StreamMergeContext<T> mergeContext, IStreamMergeAsyncEnumerator<T> source)
{
_mergeContext = mergeContext;
_source = source;
SetOrderValues();
}
private void SetOrderValues()
{
_orderValues = HasElement() ? GetCurrentOrderValues() : new List<IComparable>(0);
}
public async ValueTask<bool> MoveNextAsync()
{
var has = await _source.MoveNextAsync();
SetOrderValues();
return has;
}
public T Current =>_source.Current;
public bool SkipFirst()
{
return _source.SkipFirst();
}
public bool HasElement()
{
return _source.HasElement();
}
public T ReallyCurrent => _source.ReallyCurrent;
private List<IComparable> GetCurrentOrderValues()
{
if (!_mergeContext.Orders.Any())
return new List<IComparable>(0);
var list = new List<IComparable>(_mergeContext.Orders.Count());
foreach (var order in _mergeContext.Orders)
{
var value = _source.ReallyCurrent.GetValueByExpression(order.PropertyExpression);
if (value is IComparable comparable)
list.Add(comparable);
else
throw new NotSupportedException($"order by value [{order}] must implements IComparable");
}
return list;
}
public int CompareTo(IOrderStreamMergeAsyncEnumerator<T> other)
{
int i = 0;
foreach (var order in _mergeContext.Orders) {
int result = CompareHelper.CompareToWith(_orderValues[i], other.GetCompares()[i], order.IsAsc);
if (0 != result) {
return result;
}
i++;
}
return 0;
}
public List<IComparable> GetCompares()
{
return _orderValues ?? new List<IComparable>(0);
}
public ValueTask DisposeAsync()
{
return _source.DisposeAsync();
}
}
}

View File

@ -0,0 +1,61 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
namespace ShardingCore.Core.Internal.StreamMerge.Enumerators
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 23:26:49
* @Email: 326308290@qq.com
*/
internal class StreamMergeAsyncEnumerator<T>:IStreamMergeAsyncEnumerator<T>
{
private readonly IAsyncEnumerator<T> _source;
private bool skip;
public StreamMergeAsyncEnumerator(IAsyncEnumerator<T> source)
{
_source = source;
skip = true;
}
public async ValueTask<bool> MoveNextAsync()
{
if (skip)
{
skip = false;
return null!=_source.Current;
}
return await _source.MoveNextAsync();
}
public T Current => skip?default:_source.Current;
public bool SkipFirst()
{
if (skip)
{
skip = false;
return true;
}
return false;
}
public bool HasElement()
{
return null != _source.Current;
}
public T ReallyCurrent => _source.Current;
public async ValueTask DisposeAsync()
{
await _source.DisposeAsync();
}
}
}

View File

@ -45,7 +45,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
{
var shardingDbContext = _mergeContext.CreateDbContext();
parallelDbContexts.Add(shardingDbContext);
var newQueryable = (IQueryable<T>) _mergeContext.Source.ReplaceDbContextQueryable(shardingDbContext);
var newQueryable = (IQueryable<T>) _mergeContext.GetReWriteQueryable().ReplaceDbContextQueryable(shardingDbContext);
return await EFCoreExecute(newQueryable,routeResult,efQuery);
});

View File

@ -0,0 +1,76 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
using ShardingCore.Core.Internal.StreamMerge.Enumerators;
using ShardingCore.Core.Internal.StreamMerge.ListMerge;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Extensions;
namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
{
/*
* @Author: xjm
* @Description:
* @Date: Friday, 29 January 2021 15:40:15
* @Email: 326308290@qq.com
*/
internal class GenericStreamMergeEngine<T> :IStreamMergeEngine<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly List<DbContext> _parallelDbContexts;
public GenericStreamMergeEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
_parallelDbContexts = new List<DbContext>(mergeContext.RouteResults.Count());
}
public static GenericStreamMergeEngine<T> Create<T>(StreamMergeContext<T> mergeContext)
{
return new GenericStreamMergeEngine<T>(mergeContext);
}
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable, RouteResult routeResult)
{
using (var scope = _mergeContext.CreateScope())
{
scope.ShardingAccessor.ShardingContext = ShardingContext.Create(routeResult);
#if !EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
#endif
#if EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetEnumerator();
await enumator.MoveNext();
#endif
return enumator;
}
}
public async Task<IEnumerable<IStreamMergeAsyncEnumerator<T>>> GetStreamEnumerator()
{
var enumeratorTasks = _mergeContext.RouteResults.Select(routeResult =>
{
return Task.Run(async () =>
{
var shardingDbContext = _mergeContext.CreateDbContext();
_parallelDbContexts.Add(shardingDbContext);
var newQueryable = (IQueryable<T>) _mergeContext.GetReWriteQueryable().ReplaceDbContextQueryable(shardingDbContext);
var asyncEnumerator = await GetAsyncEnumerator(newQueryable, routeResult);
return new StreamMergeAsyncEnumerator<T>( asyncEnumerator);
});
}).ToArray();
return (await Task.WhenAll(enumeratorTasks)).ToList();
}
public void Dispose()
{
_parallelDbContexts.ForEach(o => o.Dispose());
}
}
}

View File

@ -0,0 +1,107 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
using ShardingCore.Core.Internal.StreamMerge.Enumerators;
namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
{
/*
* @Author: xjm
* @Description:
* @Date: Friday, 29 January 2021 20:37:44
* @Email: 326308290@qq.com
*/
internal class GenericStreamMergeProxyEngine<T> : IDisposable
{
private readonly StreamMergeContext<T> _mergeContext;
private IStreamMergeEngine<T> _streamMergeEngine;
private GenericStreamMergeProxyEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
_streamMergeEngine = GenericStreamMergeEngine<T>.Create(mergeContext);
}
public static GenericStreamMergeProxyEngine<T> Create(StreamMergeContext<T> mergeContext)
{
return new GenericStreamMergeProxyEngine<T>(mergeContext);
}
public async Task<List<T>> ToListAsync(int capacity=20)
{
var enumerator = new NoPaginationStreamMergeEngine<T>(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
var list = new List<T>(capacity);
#if !EFCORE2
while (await enumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNextAsync())
#endif
{
list.Add(enumerator.Current);
}
return list;
}
public async Task<T> FirstOrDefaultAsync()
{
var enumerator = new NoPaginationStreamMergeEngine<T>(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
#if !EFCORE2
while (await enumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNextAsync())
#endif
{
return enumerator.Current;
}
return default;
}
public async Task<bool> AnyAsync()
{
var enumerator = (IStreamMergeAsyncEnumerator<bool>)new NoPaginationStreamMergeEngine<T>(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
#if !EFCORE2
while (await enumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNextAsync())
#endif
{
if (!enumerator.Current)
return false;
}
return true;
}
public async Task<int> CountAsync()
{
var enumerator = (IStreamMergeAsyncEnumerator<int>)new NoPaginationStreamMergeEngine<T>(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
var result = 0;
#if !EFCORE2
while (await enumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNextAsync())
#endif
{
result += enumerator.Current;
}
return result;
}
public void Dispose()
{
_streamMergeEngine.Dispose();
}
}
}

View File

@ -0,0 +1,69 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Enumerators;
namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
{
/*
* @Author: xjm
* @Description:
* @Date: Friday, 29 January 2021 17:55:29
* @Email: 326308290@qq.com
*/
internal class ListStreamMergeProxyEngine<T>:IDisposable
{
private readonly StreamMergeContext<T> _mergeContext;
private IStreamMergeEngine<T> _streamMergeEngine;
private const int defaultCapacity = 0x10;//默认容量为16
public ListStreamMergeProxyEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
_streamMergeEngine = GenericStreamMergeEngine<T>.Create(mergeContext);
}
public async Task<List<T>> ToListAsync()
{
//如果合并数据的时候不需要跳过也没有take多少那么就是直接next
var skip = _mergeContext.Skip;
var take = _mergeContext.Take;
var list = new List<T>(skip.GetValueOrDefault() + take ?? defaultCapacity);
var realSkip = 0;
var realTake = 0;
var enumerator = new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, await _streamMergeEngine.GetStreamEnumerator());
#if !EFCORE2
while (await enumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNextAsync())
#endif
{
//获取真实的需要跳过的条数
if (skip.HasValue)
{
if (realSkip < skip)
{
realSkip++;
continue;
}
}
list.Add(enumerator.Current);
if (take.HasValue)
{
realTake++;
if(realTake<=take.Value)
break;
}
}
return list;
}
public void Dispose()
{
_streamMergeEngine.Dispose();
}
}
}

View File

@ -14,5 +14,6 @@ namespace ShardingCore.Core.Internal.StreamMerge
internal interface IStreamMergeContextFactory
{
StreamMergeContext<T> Create<T>(IQueryable<T> queryable, IEnumerable<RouteResult> routeResults);
StreamMergeContext<T> Create<T>(IQueryable<T> queryable);
}
}

View File

@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
namespace ShardingCore.Core.Internal.StreamMerge
{
/*
* @Author: xjm
* @Description:
* @Date: Friday, 29 January 2021 16:11:32
* @Email: 326308290@qq.com
*/
internal interface IStreamMergeEngine<T>:IDisposable
{
Task<IEnumerable<IStreamMergeAsyncEnumerator<T>>> GetStreamEnumerator();
}
}

View File

@ -1,148 +1,148 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.PriorityQueues;
namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
{
/*
* @Author: xjm
* @Description:
* @Date: Monday, 25 January 2021 08:06:12
* @Email: 326308290@qq.com
*/
#if !EFCORE2
internal class OrderAsyncEnumerator<T> : IAsyncEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly List<IAsyncEnumerator<T>> _sources;
private readonly PriorityQueue<OrderMergeItem<T>> _queue;
private bool skipFirst;
private IAsyncEnumerator<T> _currentEnumerator;
public OrderAsyncEnumerator(StreamMergeContext<T> mergeContext,List<IAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_sources = sources;
_queue = new PriorityQueue<OrderMergeItem<T>>(sources.Count);
skipFirst = true;
SetOrderEnumerator();
}
private void SetOrderEnumerator()
{
foreach (var source in _sources)
{
var orderMergeItem = new OrderMergeItem<T>(_mergeContext, source);
if (null!=orderMergeItem.GetCurrentEnumerator().Current)
_queue.Offer(orderMergeItem);
}
_currentEnumerator = _queue.IsEmpty() ? _sources.FirstOrDefault() : _queue.Peek().GetCurrentEnumerator();
}
public async ValueTask<bool> MoveNextAsync()
{
if (_queue.IsEmpty())
return false;
if (skipFirst)
{
skipFirst = false;
return true;
}
var first = _queue.Poll();
if (await first.MoveNextAsync())
{
_queue.Offer(first);
}
if (_queue.IsEmpty())
{
return false;
}
_currentEnumerator = _queue.Peek().GetCurrentEnumerator();
return true;
}
public async ValueTask DisposeAsync()
{
foreach (var source in _sources)
{
await source.DisposeAsync();
}
}
public T Current => _currentEnumerator.Current;
}
#endif
#if EFCORE2
internal class OrderAsyncEnumerator<T> : IAsyncEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly List<IAsyncEnumerator<T>> _sources;
private readonly PriorityQueue<OrderMergeItem<T>> _queue;
private bool skipFirst;
private IAsyncEnumerator<T> _currentEnumerator;
public OrderAsyncEnumerator(StreamMergeContext<T> mergeContext, List<IAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_sources = sources;
_queue = new PriorityQueue<OrderMergeItem<T>>(sources.Count);
skipFirst = true;
SetOrderEnumerator();
}
private void SetOrderEnumerator()
{
foreach (var source in _sources)
{
var orderMergeItem = new OrderMergeItem<T>(_mergeContext, source);
if (null!=orderMergeItem.GetCurrentEnumerator().Current)
_queue.Offer(orderMergeItem);
}
_currentEnumerator = _queue.IsEmpty() ? _sources.FirstOrDefault() : _queue.Peek().GetCurrentEnumerator();
}
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
if (_queue.IsEmpty())
return false;
if (skipFirst)
{
skipFirst = false;
return true;
}
var first = _queue.Poll();
if (await first.MoveNextAsync())
{
_queue.Offer(first);
}
if (_queue.IsEmpty())
{
return false;
}
_currentEnumerator = _queue.Peek().GetCurrentEnumerator();
return true;
}
public void Dispose()
{
foreach (var source in _sources)
{
source.Dispose();
}
}
public T Current => _currentEnumerator.Current;
}
#endif
}
// using System.Collections.Generic;
// using System.Linq;
// using System.Threading;
// using System.Threading.Tasks;
// using ShardingCore.Core.Internal.PriorityQueues;
//
// namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
// {
// /*
// * @Author: xjm
// * @Description:
// * @Date: Monday, 25 January 2021 08:06:12
// * @Email: 326308290@qq.com
// */
// #if !EFCORE2
// internal class OrderAsyncEnumerator<T> : IAsyncEnumerator<T>
// {
// private readonly StreamMergeContext<T> _mergeContext;
// private readonly List<IAsyncEnumerator<T>> _sources;
// private readonly PriorityQueue<OrderMergeItem<T>> _queue;
// private bool skipFirst;
// private IAsyncEnumerator<T> _currentEnumerator;
//
// public OrderAsyncEnumerator(StreamMergeContext<T> mergeContext,List<IAsyncEnumerator<T>> sources)
// {
// _mergeContext = mergeContext;
// _sources = sources;
// _queue = new PriorityQueue<OrderMergeItem<T>>(sources.Count);
// skipFirst = true;
// SetOrderEnumerator();
// }
//
// private void SetOrderEnumerator()
// {
// foreach (var source in _sources)
// {
// var orderMergeItem = new OrderMergeItem<T>(_mergeContext, source);
// if (null!=orderMergeItem.GetCurrentEnumerator().Current)
// _queue.Offer(orderMergeItem);
// }
// _currentEnumerator = _queue.IsEmpty() ? _sources.FirstOrDefault() : _queue.Peek().GetCurrentEnumerator();
// }
//
// public async ValueTask<bool> MoveNextAsync()
// {
// if (_queue.IsEmpty())
// return false;
// if (skipFirst)
// {
// skipFirst = false;
// return true;
// }
//
// var first = _queue.Poll();
// if (await first.MoveNextAsync())
// {
// _queue.Offer(first);
// }
//
// if (_queue.IsEmpty())
// {
// return false;
// }
//
// _currentEnumerator = _queue.Peek().GetCurrentEnumerator();
// return true;
// }
//
// public async ValueTask DisposeAsync()
// {
// foreach (var source in _sources)
// {
// await source.DisposeAsync();
// }
// }
//
// public T Current => _currentEnumerator.Current;
// }
// #endif
// #if EFCORE2
//
// internal class OrderAsyncEnumerator<T> : IAsyncEnumerator<T>
// {
// private readonly StreamMergeContext<T> _mergeContext;
// private readonly List<IAsyncEnumerator<T>> _sources;
// private readonly PriorityQueue<OrderMergeItem<T>> _queue;
// private bool skipFirst;
// private IAsyncEnumerator<T> _currentEnumerator;
//
// public OrderAsyncEnumerator(StreamMergeContext<T> mergeContext, List<IAsyncEnumerator<T>> sources)
// {
// _mergeContext = mergeContext;
// _sources = sources;
// _queue = new PriorityQueue<OrderMergeItem<T>>(sources.Count);
// skipFirst = true;
// SetOrderEnumerator();
// }
//
// private void SetOrderEnumerator()
// {
// foreach (var source in _sources)
// {
// var orderMergeItem = new OrderMergeItem<T>(_mergeContext, source);
// if (null!=orderMergeItem.GetCurrentEnumerator().Current)
// _queue.Offer(orderMergeItem);
// }
//
// _currentEnumerator = _queue.IsEmpty() ? _sources.FirstOrDefault() : _queue.Peek().GetCurrentEnumerator();
// }
//
// public async Task<bool> MoveNext(CancellationToken cancellationToken)
// {
// if (_queue.IsEmpty())
// return false;
// if (skipFirst)
// {
// skipFirst = false;
// return true;
// }
//
// var first = _queue.Poll();
// if (await first.MoveNextAsync())
// {
// _queue.Offer(first);
// }
//
// if (_queue.IsEmpty())
// {
// return false;
// }
//
// _currentEnumerator = _queue.Peek().GetCurrentEnumerator();
// return true;
// }
//
// public void Dispose()
// {
// foreach (var source in _sources)
// {
// source.Dispose();
// }
// }
//
//
// public T Current => _currentEnumerator.Current;
// }
// #endif
// }

View File

@ -1,153 +1,158 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Extensions;
namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
{
/*
* @Author: xjm
* @Description:
* @Date: Monday, 25 January 2021 11:33:57
* @Email: 326308290@qq.com
*/
#if !EFCORE2
internal class OrderMergeItem<T> : IComparable<OrderMergeItem<T>>, IAsyncDisposable
{
/// <summary>
/// 合并数据上下文
/// </summary>
private readonly StreamMergeContext<T> _mergeContext;
private readonly IAsyncEnumerator<T> _source;
private List<IComparable> _orderValues;
public OrderMergeItem(StreamMergeContext<T> mergeContext, IAsyncEnumerator<T> source)
{
_mergeContext = mergeContext;
_source = source;
SetOrderValues(null!=GetCurrentEnumerator().Current);
}
public IAsyncEnumerator<T> GetCurrentEnumerator() => _source;
private void SetOrderValues(bool hasElement)
{
_orderValues = hasElement ? GetCurrentOrderValues() : new List<IComparable>(0);
}
public async Task<bool> MoveNextAsync()
{
var has = await _source.MoveNextAsync();
SetOrderValues(has);
return has;
}
private List<IComparable> GetCurrentOrderValues()
{
if (!_mergeContext.Orders.Any())
return new List<IComparable>(0);
var list = new List<IComparable>(_mergeContext.Orders.Count());
foreach (var order in _mergeContext.Orders)
{
var value = GetCurrentEnumerator().Current.GetValueByExpression(order.PropertyExpression);
if (value is IComparable comparable)
list.Add(comparable);
else
throw new NotSupportedException($"order by value [{order}] must implements IComparable");
}
return list;
}
public int CompareTo(OrderMergeItem<T> other)
{
int i = 0;
foreach (var order in _mergeContext.Orders) {
int result = CompareHelper.CompareToWith(_orderValues[i], other._orderValues[i], order.IsAsc);
if (0 != result) {
return result;
}
i++;
}
return 0;
}
public ValueTask DisposeAsync()
{
return _source.DisposeAsync();
}
}
#endif
#if EFCORE2
internal class OrderMergeItem<T> : IComparable<OrderMergeItem<T>>, IDisposable
{
/// <summary>
/// 合并数据上下文
/// </summary>
private readonly StreamMergeContext<T> _mergeContext;
private readonly IAsyncEnumerator<T> _source;
private List<IComparable> _orderValues;
public OrderMergeItem(StreamMergeContext<T> mergeContext, IAsyncEnumerator<T> source)
{
_mergeContext = mergeContext;
_source = source;
SetOrderValues(null!=GetCurrentEnumerator().Current);
}
public IAsyncEnumerator<T> GetCurrentEnumerator() => _source;
private void SetOrderValues(bool hasElement)
{
_orderValues = hasElement ? GetCurrentOrderValues() : new List<IComparable>(0);
}
public async Task<bool> MoveNextAsync()
{
var has = await _source.MoveNext();
SetOrderValues(has);
return has;
}
private List<IComparable> GetCurrentOrderValues()
{
if (!_mergeContext.Orders.Any())
return new List<IComparable>(0);
var list = new List<IComparable>(_mergeContext.Orders.Count());
foreach (var order in _mergeContext.Orders)
{
var value = GetCurrentEnumerator().Current.GetValueByExpression(order.PropertyExpression);
if (value is IComparable comparable)
list.Add(comparable);
else
throw new NotSupportedException($"order by value [{order}] must implements IComparable");
}
return list;
}
public int CompareTo(OrderMergeItem<T> other)
{
int i = 0;
foreach (var order in _mergeContext.Orders) {
int result = CompareHelper.CompareToWith(_orderValues[i], other._orderValues[i], order.IsAsc);
if (0 != result) {
return result;
}
i++;
}
return 0;
}
public void Dispose()
{
_source.Dispose();
}
}
#endif
}
// using System;
// using System.Collections.Generic;
// using System.Linq;
// using System.Threading.Tasks;
// using ShardingCore.Extensions;
//
// namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
// {
// /*
// * @Author: xjm
// * @Description:
// * @Date: Monday, 25 January 2021 11:33:57
// * @Email: 326308290@qq.com
// */
// #if !EFCORE2
//
// internal class OrderMergeItem<T> : IComparable<OrderMergeItem<T>>, IAsyncDisposable
// {
// /// <summary>
// /// 合并数据上下文
// /// </summary>
// private readonly StreamMergeContext<T> _mergeContext;
//
// private readonly IAsyncEnumerator<T> _source;
// private List<IComparable> _orderValues;
//
// public OrderMergeItem(StreamMergeContext<T> mergeContext, IAsyncEnumerator<T> source)
// {
// _mergeContext = mergeContext;
// _source = source;
// SetOrderValues(null!=GetCurrentEnumerator().Current);
// }
//
// public IAsyncEnumerator<T> GetCurrentEnumerator() => _source;
//
// private void SetOrderValues(bool hasElement)
// {
// _orderValues = hasElement ? GetCurrentOrderValues() : new List<IComparable>(0);
// }
// public async Task<bool> MoveNextAsync()
// {
// var has = await _source.MoveNextAsync();
// SetOrderValues(has);
// return has;
// }
//
// public bool HasElement()
// {
//
// }
//
// private List<IComparable> GetCurrentOrderValues()
// {
// if (!_mergeContext.Orders.Any())
// return new List<IComparable>(0);
// var list = new List<IComparable>(_mergeContext.Orders.Count());
// foreach (var order in _mergeContext.Orders)
// {
// var value = GetCurrentEnumerator().Current.GetValueByExpression(order.PropertyExpression);
// if (value is IComparable comparable)
// list.Add(comparable);
// else
// throw new NotSupportedException($"order by value [{order}] must implements IComparable");
// }
//
// return list;
// }
//
// public int CompareTo(OrderMergeItem<T> other)
// {
// int i = 0;
// foreach (var order in _mergeContext.Orders) {
// int result = CompareHelper.CompareToWith(_orderValues[i], other._orderValues[i], order.IsAsc);
// if (0 != result) {
// return result;
// }
// i++;
// }
// return 0;
// }
//
// public ValueTask DisposeAsync()
// {
// return _source.DisposeAsync();
// }
// }
// #endif
// #if EFCORE2
//
// internal class OrderMergeItem<T> : IComparable<OrderMergeItem<T>>, IDisposable
// {
// /// <summary>
// /// 合并数据上下文
// /// </summary>
// private readonly StreamMergeContext<T> _mergeContext;
//
// private readonly IAsyncEnumerator<T> _source;
// private List<IComparable> _orderValues;
//
// public OrderMergeItem(StreamMergeContext<T> mergeContext, IAsyncEnumerator<T> source)
// {
// _mergeContext = mergeContext;
// _source = source;
// SetOrderValues(null!=GetCurrentEnumerator().Current);
// }
//
// public IAsyncEnumerator<T> GetCurrentEnumerator() => _source;
//
// private void SetOrderValues(bool hasElement)
// {
// _orderValues = hasElement ? GetCurrentOrderValues() : new List<IComparable>(0);
// }
//
// public async Task<bool> MoveNextAsync()
// {
// var has = await _source.MoveNext();
// SetOrderValues(has);
// return has;
// }
//
// private List<IComparable> GetCurrentOrderValues()
// {
// if (!_mergeContext.Orders.Any())
// return new List<IComparable>(0);
// var list = new List<IComparable>(_mergeContext.Orders.Count());
// foreach (var order in _mergeContext.Orders)
// {
// var value = GetCurrentEnumerator().Current.GetValueByExpression(order.PropertyExpression);
// if (value is IComparable comparable)
// list.Add(comparable);
// else
// throw new NotSupportedException($"order by value [{order}] must implements IComparable");
// }
//
// return list;
// }
//
// public int CompareTo(OrderMergeItem<T> other)
// {
// int i = 0;
// foreach (var order in _mergeContext.Orders) {
// int result = CompareHelper.CompareToWith(_orderValues[i], other._orderValues[i], order.IsAsc);
// if (0 != result) {
// return result;
// }
// i++;
// }
// return 0;
// }
//
//
// public void Dispose()
// {
// _source.Dispose();
// }
// }
// #endif
// }

View File

@ -2,6 +2,8 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.StreamMerge.Abstractions;
using ShardingCore.Core.Internal.StreamMerge.Enumerators;
using ShardingCore.Extensions;
namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
@ -16,12 +18,12 @@ namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
{
private const int defaultCapacity = 0x10;//默认容量为16
private readonly StreamMergeContext<T> _mergeContext;
private readonly List<IAsyncEnumerator<T>> _sources;
private readonly IStreamMergeAsyncEnumerator<T> _streamMergeAsyncEnumerator;
public StreamMergeListEngine(StreamMergeContext<T> mergeContext,List<IAsyncEnumerator<T>> sources)
public StreamMergeListEngine(StreamMergeContext<T> mergeContext,IEnumerable<IStreamMergeAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_sources = sources;
_streamMergeAsyncEnumerator = new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext,sources);
}
public async Task<List<T>> Execute()
@ -30,14 +32,13 @@ namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
var skip = _mergeContext.Skip;
var take = _mergeContext.Take;
var list = new List<T>(skip.GetValueOrDefault() + take ?? defaultCapacity);
var enumerator=new OrderAsyncEnumerator<T>(_mergeContext,_sources);
var realSkip = 0;
var realTake = 0;
#if !EFCORE2
while (await enumerator.MoveNextAsync())
while (await _streamMergeAsyncEnumerator.MoveNextAsync())
#endif
#if EFCORE2
while (await enumerator.MoveNext())
while (await enumerator.MoveNextAsync())
#endif
{
//获取真实的需要跳过的条数
@ -49,7 +50,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
continue;
}
}
list.Add(enumerator.Current);
list.Add(_streamMergeAsyncEnumerator.Current);
if (take.HasValue)
{
realTake++;

View File

@ -4,6 +4,7 @@ using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge.GenericMerges;
using ShardingCore.Core.Internal.StreamMerge.ListMerge;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Extensions;
@ -28,61 +29,14 @@ namespace ShardingCore.Core.Internal.StreamMerge.ListSourceMerges
_mergeContext = mergeContext;
}
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable,RouteResult routeResult)
{
using (var scope = _mergeContext.CreateScope())
{
scope.ShardingAccessor.ShardingContext = ShardingContext.Create(routeResult);
#if !EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
#endif
#if EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetEnumerator();
await enumator.MoveNext();
#endif
return enumator;
}
}
public async Task<List<T>> Execute()
{
//去除分页,获取前Take+Skip数量
var noPageSource = _mergeContext.Source.RemoveTake().RemoveSkip();
if (_mergeContext.Take.HasValue)
noPageSource = noPageSource.Take(_mergeContext.Take.Value + _mergeContext.Skip.GetValueOrDefault());
//从各个分表获取数据
List<DbContext> parallelDbContexts = new List<DbContext>(_mergeContext.RouteResults.Count());
try
{
var enumeratorTasks = _mergeContext.RouteResults.Select(routeResult =>
{
return Task.Run(async () =>
{
var shardingDbContext = _mergeContext.CreateDbContext();
parallelDbContexts.Add(shardingDbContext);
var newQueryable = (IQueryable<T>) noPageSource.ReplaceDbContextQueryable(shardingDbContext);
#if !EFCORE2
return await GetAsyncEnumerator(newQueryable,routeResult);
#endif
#if EFCORE2
return await GetAsyncEnumerator(newQueryable,routeResult);
#endif
});
}).ToArray();
var enumerators = (await Task.WhenAll(enumeratorTasks)).ToList();
var engine = new StreamMergeListEngine<T>(_mergeContext, enumerators);
return await engine.Execute();
}
finally
{
parallelDbContexts.ForEach(o => o.Dispose());
}
using (var engine =new GenericStreamMergeEngine<T>(_mergeContext))
{
var enumerators = await engine.GetStreamEnumerator();
return await new StreamMergeListEngine<T>(_mergeContext, enumerators).Execute();
}
}
}
}

View File

@ -0,0 +1,36 @@
using System.Linq;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Extensions;
namespace ShardingCore.Core.Internal.StreamMerge.ReWrite
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 23:44:24
* @Email: 326308290@qq.com
*/
internal class ReWriteEngine<T>
{
private readonly IQueryable<T> _queryable;
public ReWriteEngine(IQueryable<T> queryable)
{
_queryable = queryable;
}
public ReWriteResult<T> ReWrite()
{
var extraEntry = _queryable.GetExtraEntry();
var skip = extraEntry.Skip;
var take = extraEntry.Take;
var orders = extraEntry.Orders ?? Enumerable.Empty<PropertyOrder>();
//去除分页,获取前Take+Skip数量
var noPageSource = _queryable.RemoveTake().RemoveSkip();
if (take.HasValue)
noPageSource = noPageSource.Take(take.Value + skip.GetValueOrDefault());
return new ReWriteResult<T>(_queryable,noPageSource,skip,take,orders);
}
}
}

View File

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Core.Internal.Visitors;
namespace ShardingCore.Core.Internal.StreamMerge.ReWrite
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 23:45:29
* @Email: 326308290@qq.com
*/
internal class ReWriteResult<T>
{
public ReWriteResult(IQueryable<T> originalQueryable, IQueryable<T> reWriteQueryable, int? skip, int? take, IEnumerable<PropertyOrder> orders)
{
OriginalQueryable = originalQueryable;
ReWriteQueryable = reWriteQueryable;
Skip = skip;
Take = take;
Orders = orders;
}
public IQueryable<T> OriginalQueryable { get; }
public IQueryable<T> ReWriteQueryable { get; }
public int? Skip { get; }
public int? Take { get; }
public IEnumerable<PropertyOrder> Orders { get; }
}
}

View File

@ -2,6 +2,7 @@ using System.Collections.Generic;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge.ReWrite;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.DbContexts;
@ -19,7 +20,8 @@ namespace ShardingCore.Core.Internal.StreamMerge
{
private readonly IShardingParallelDbContextFactory _shardingParallelDbContextFactory;
private readonly IShardingScopeFactory _shardingScopeFactory;
public IQueryable<T> Source { get; }
private readonly IQueryable<T> _source;
private readonly IQueryable<T> _reWriteSource;
public IEnumerable<RouteResult> RouteResults { get; }
public int? Skip { get; private set; }
public int? Take { get; private set; }
@ -30,12 +32,13 @@ namespace ShardingCore.Core.Internal.StreamMerge
{
_shardingParallelDbContextFactory = shardingParallelDbContextFactory;
_shardingScopeFactory = shardingScopeFactory;
Source = source;
_source = source;
RouteResults = routeResults;
var extraEntry = source.GetExtraEntry();
Skip = extraEntry.Skip;
Take = extraEntry.Take;
Orders = extraEntry.Orders ?? Enumerable.Empty<PropertyOrder>();
var reWriteResult = new ReWriteEngine<T>(source).ReWrite();
Skip = reWriteResult.Skip;
Take = reWriteResult.Take;
Orders = reWriteResult.Orders ?? Enumerable.Empty<PropertyOrder>();
_reWriteSource = reWriteResult.ReWriteQueryable;
}
public DbContext CreateDbContext()
@ -48,5 +51,14 @@ namespace ShardingCore.Core.Internal.StreamMerge
return _shardingScopeFactory.CreateScope();
}
public IQueryable<T> GetReWriteQueryable()
{
return _reWriteSource;
}
public IQueryable<T> GetOriginalQueryable()
{
return _source;
}
}
}

View File

@ -17,15 +17,21 @@ namespace ShardingCore.Core.Internal.StreamMerge
{
private readonly IShardingParallelDbContextFactory _shardingParallelDbContextFactory;
private readonly IShardingScopeFactory _shardingScopeFactory;
private readonly IRoutingRuleEngineFactory _routingRuleEngineFactory;
public StreamMergeContextFactory(IShardingParallelDbContextFactory shardingParallelDbContextFactory,IShardingScopeFactory shardingScopeFactory)
public StreamMergeContextFactory(IShardingParallelDbContextFactory shardingParallelDbContextFactory,IShardingScopeFactory shardingScopeFactory,IRoutingRuleEngineFactory routingRuleEngineFactory)
{
_shardingParallelDbContextFactory = shardingParallelDbContextFactory;
_shardingScopeFactory = shardingScopeFactory;
_routingRuleEngineFactory = routingRuleEngineFactory;
}
public StreamMergeContext<T> Create<T>(IQueryable<T> queryable, IEnumerable<RouteResult> routeResults)
{
return new StreamMergeContext<T>(queryable, routeResults, _shardingParallelDbContextFactory, _shardingScopeFactory);
}
public StreamMergeContext<T> Create<T>(IQueryable<T> queryable)
{
return new StreamMergeContext<T>(queryable, _routingRuleEngineFactory.Route(queryable), _shardingParallelDbContextFactory, _shardingScopeFactory);
}
}
}

View File

@ -35,21 +35,17 @@ namespace ShardingCore.Core
{
private IQueryable<T> _source;
private bool _autoParseRoute = true;
private readonly IVirtualTableManager _virtualTableManager;
private readonly IStreamMergeContextFactory _streamMergeContextFactory;
private Dictionary<Type, Expression> _routes = new Dictionary<Type, Expression>();
private readonly Dictionary<IVirtualTable, List<string>> _endRoutes = new Dictionary<IVirtualTable, List<string>>();
private readonly IRoutingRuleEngineFactory _routingRuleEngineFactory;
private readonly RouteRuleContext<T> _routeRuleContext;
public ShardingQueryable(IQueryable<T> source)
{
_source = source;
_virtualTableManager = ShardingContainer.Services.GetService<IVirtualTableManager>();
_streamMergeContextFactory = ShardingContainer.Services.GetService<IStreamMergeContextFactory>();
_routingRuleEngineFactory=ShardingContainer.Services.GetService<IRoutingRuleEngineFactory>();
_routeRuleContext=new RouteRuleContext<T>(source,_virtualTableManager);
}
@ -104,9 +100,7 @@ namespace ShardingCore.Core
private StreamMergeContext<T> GetContext()
{
var routeRuleEngine = _routingRuleEngineFactory.CreateEngine();
var routeResults = routeRuleEngine.Route(_routeRuleContext);
return _streamMergeContextFactory.Create(_source, routeResults);
return _streamMergeContextFactory.Create(_source);
}
private async Task<List<TResult>> GetGenericMergeEngine<TResult>(Func<IQueryable, Task<TResult>> efQuery)
{
@ -128,20 +122,21 @@ namespace ShardingCore.Core
public async Task<List<T>> ToListAsync()
{
return await new StreamMergeListSourceEngine<T>(GetContext()).Execute();
var context = GetContext();
using (var engine = GenericStreamMergeProxyEngine<T>.Create(context))
{
return await engine.ToListAsync();
}
}
public async Task<T> FirstOrDefaultAsync()
{
var result = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.FirstOrDefaultAsync((IQueryable<T>) queryable));
var q = result.Where(o => o != null).AsQueryable();
var extraEntry = _source.GetExtraEntry();
if (extraEntry.Orders.Any())
q = q.OrderWithExpression(extraEntry.Orders);
return q.FirstOrDefault();
var context = GetContext();
using (var engine = GenericStreamMergeProxyEngine<T>.Create(context))
{
return await engine.FirstOrDefaultAsync();
}
}
public async Task<bool> AnyAsync()

View File

@ -35,10 +35,22 @@ namespace ShardingCore.Test50.MySql
{
var modascs=await _virtualDbContext.Set<SysUserMod>().OrderBy(o=>o.Age).ToShardingListAsync();
Assert.Equal(100,modascs.Count);
Assert.Equal(100,modascs.Last().Age);
var i = 1;
foreach (var age in modascs)
{
Assert.Equal(i,age.Age);
i++;
}
var moddescs=await _virtualDbContext.Set<SysUserMod>().OrderByDescending(o=>o.Age).ToShardingListAsync();
Assert.Equal(100,moddescs.Count);
Assert.Equal(1,moddescs.Last().Age);
var j = 100;
foreach (var age in moddescs)
{
Assert.Equal(j,age.Age);
j--;
}
}
[Fact]
public async Task ToList_Id_In_Test()
@ -101,8 +113,15 @@ namespace ShardingCore.Test50.MySql
[Fact]
public async Task FirstOrDefault_Order_By_Id_Test()
{
var sysUserModAge=await _virtualDbContext.Set<SysUserMod>().OrderBy(o=>o.Age).ShardingFirstOrDefaultAsync();
Assert.True(sysUserModAge!=null&&sysUserModAge.Id=="1");
var sysUserModAgeDesc=await _virtualDbContext.Set<SysUserMod>().OrderByDescending(o=>o.Age).ShardingFirstOrDefaultAsync();
Assert.True(sysUserModAgeDesc!=null&&sysUserModAgeDesc.Id=="100");
var sysUserMod=await _virtualDbContext.Set<SysUserMod>().OrderBy(o=>o.Id).ShardingFirstOrDefaultAsync();
Assert.True(sysUserMod!=null&&sysUserMod.Id=="1");
var sysUserModDesc=await _virtualDbContext.Set<SysUserMod>().OrderByDescending(o=>o.Id).ShardingFirstOrDefaultAsync();
Assert.True(sysUserModDesc!=null&&sysUserModDesc.Id=="99");
var sysUserRange=await _virtualDbContext.Set<SysUserRange>().OrderBy(o=>o.Id).ShardingFirstOrDefaultAsync();
Assert.True(sysUserRange!=null&&sysUserRange.Id=="1");
}