完成同步查询

This commit is contained in:
xuejiaming 2021-08-19 15:08:02 +08:00
parent 138584bbd8
commit 9c7789ca29
32 changed files with 1191 additions and 107 deletions

View File

@ -32,8 +32,22 @@ namespace Sample.SqlServer.Controllers
var resultx2 = await _defaultTableDbContext.Set<SysUserMod>().CountAsync(o => o.Age<=10);
var resultx = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").FirstOrDefaultAsync();
var resultx33 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o=>o.Id).FirstOrDefaultAsync();
var resulxxt = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").ToListAsync();
var result = await _defaultTableDbContext.Set<SysUserMod>().ToListAsync();
var sresultx11231 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age == 198198).Select(o => o.Id).Contains("1981");
var sresultx1121 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Sum(o => o.Age);
var sresultx111 = _defaultTableDbContext.Set<SysUserMod>().FirstOrDefault(o => o.Id == "198");
var sresultx2 = _defaultTableDbContext.Set<SysUserMod>().Count(o => o.Age <= 10);
var sresultx = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").FirstOrDefault();
var sresultx33 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o => o.Id).FirstOrDefault();
var sresultxc = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o => o.Id).ToList();
var sresultxasdc = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").ToList();
var sresult = _defaultTableDbContext.Set<SysUserMod>().ToList();
var sysUserMod98 = result.FirstOrDefault(o => o.Id == "98");
sysUserMod98.Name = "name_update"+new Random().Next(1,99)+"_98";
await _defaultTableDbContext.SaveChangesAsync();

View File

@ -51,34 +51,105 @@ namespace ShardingCore.EFCores
{
return _currentContext;
}
private TResult EnumerableExecute<TResult>(IShardingDbContext shardingDbContext, Expression query,bool async)
{
Type queryEntityType ;
if (async)
queryEntityType= typeof(TResult).GetGenericArguments()[0];
else
{
queryEntityType = query.Type.GetSequenceType();
}
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, query);
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext });
Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
}
public TResult Execute<TResult>(Expression query)
{
throw new NotImplementedException();
var async = false;
var currentDbContext = GetCurrentDbContext().Context;
if (currentDbContext is IShardingDbContext shardingDbContext)
{
//如果根表达式为iqueryable表示需要迭代
if (query.Type.HasImplementedRawGeneric(typeof(IQueryable<>)))
{
return EnumerableExecute<TResult>(shardingDbContext, query, async);
}
if (query is MethodCallExpression methodCallExpression)
{
switch (methodCallExpression.Method.Name)
{
case nameof(Enumerable.First):
return GenericMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.FirstOrDefault):
return GenericMergeExecute<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Last):
return GenericMergeExecute<TResult>(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.LastOrDefault):
return GenericMergeExecute<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Single):
return GenericMergeExecute<TResult>(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.SingleOrDefault):
return GenericMergeExecute<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Count):
return EnsureMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.LongCount):
return EnsureMergeExecute<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Any):
return EnsureMergeExecute<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.All):
return EnsureMergeExecute<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Max):
return GenericMergeExecute<TResult>(typeof(MaxAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Min):
return EnsureMergeExecute<TResult>(typeof(MinAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Sum):
return EnsureMergeExecute2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Average):
return EnsureMergeExecute2<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Contains):
return EnsureMergeExecute<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
}
}
throw new ShardingCoreException($"db context operator not support query expression:[{query.Print()}] result type:[{typeof(TResult).FullName}]");
//IQueryable<TResult> queryable = new EnumerableQuery<TResult>(expression);
//var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext);
//var streamMergeEngine = AsyncEnumerableStreamMergeEngine<TResult>.Create<TResult>(streamMergeContext);
//return streamMergeEngine.GetAsyncEnumerator();
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
public TResult ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
var currentDbContext = GetCurrentDbContext().Context;
var async = true;
if (currentDbContext is IShardingDbContext shardingDbContext)
{
if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>)))
{
var queryEntityType = typeof(TResult).GetGenericArguments()[0];
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, query);
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext });
Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
return EnumerableExecute<TResult>(shardingDbContext, query, async);
}
@ -90,35 +161,35 @@ namespace ShardingCore.EFCores
{
case nameof(Enumerable.First):
return GenericMergeExecuteAsync<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return GenericMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.FirstOrDefault):
return GenericMergeExecuteAsync<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return GenericMergeExecute<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Last):
return GenericMergeExecuteAsync<TResult>(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return GenericMergeExecute<TResult>(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LastOrDefault):
return GenericMergeExecuteAsync<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return GenericMergeExecute<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Single):
return GenericMergeExecuteAsync<TResult>(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return GenericMergeExecute<TResult>(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.SingleOrDefault):
return GenericMergeExecuteAsync<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return GenericMergeExecute<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Count):
return EnsureMergeExecuteAsync<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return EnsureMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LongCount):
return EnsureMergeExecuteAsync<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return EnsureMergeExecute<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Any):
return EnsureMergeExecuteAsync<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return EnsureMergeExecute<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.All):
return EnsureMergeExecuteAsync<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return EnsureMergeExecute<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Max):
return GenericMergeExecuteAsync<TResult>(typeof(MaxAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return GenericMergeExecute<TResult>(typeof(MaxAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Min):
return EnsureMergeExecuteAsync<TResult>(typeof(MinAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return EnsureMergeExecute<TResult>(typeof(MinAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Sum):
return EnsureMergeExecuteAsync2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, cancellationToken);
return EnsureMergeExecute2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Average):
return EnsureMergeExecuteAsync2<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, cancellationToken);
return EnsureMergeExecute2<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Contains):
return EnsureMergeExecuteAsync<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
return EnsureMergeExecute<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
}
}
@ -138,7 +209,7 @@ namespace ShardingCore.EFCores
private TResult GenericMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
private TResult GenericMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
//Type type = typeof(EnumerableQuery<>);
@ -152,14 +223,15 @@ namespace ShardingCore.EFCores
var queryEntityType = query.GetQueryEntityType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IGenericAsyncMergeResult.MergeResultAsync));
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, new object[] { cancellationToken });
var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, @params);
}
private TResult EnsureMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
private TResult EnsureMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
//Type type = typeof(EnumerableQuery<>);
@ -173,13 +245,14 @@ namespace ShardingCore.EFCores
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType());
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureAsyncMergeResult<object>.MergeResultAsync));
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken });
var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
}
private TResult EnsureMergeExecuteAsync2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
private TResult EnsureMergeExecute2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
//Type type = typeof(EnumerableQuery<>);
@ -190,15 +263,18 @@ namespace ShardingCore.EFCores
//if (streamMergeContextMethod == null)
// throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
//var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0]);
if (async)
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0]);
else
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult));
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureAsyncMergeResult<object>.MergeResultAsync));
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken });
var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
}
//private TResult EnsureMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
//private TResult EnsureMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
//{
// //Type type = typeof(EnumerableQuery<>);

View File

@ -1,6 +1,7 @@
using System;
using System.Threading.Tasks;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeSync;
namespace ShardingCore.Sharding.Abstractions
{

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:44:33
* @Email: 326308290@qq.com
*/
public interface IOrderStreamMergeEnumerator<T>:IStreamMergeEnumerator<T>, IComparable<IOrderStreamMergeEnumerator<T>>
{
List<IComparable> GetCompares();
}
}

View File

@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
namespace ShardingCore.Sharding.Enumerators
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm

View File

@ -0,0 +1,183 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.PriorityQueues;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators.AggregateExtensions;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:43:26
* @Email: 326308290@qq.com
*/
public class MultiAggregateOrderStreamMergeEnumerator<T> : IStreamMergeEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IEnumerable<IStreamMergeEnumerator<T>> _enumerators;
private readonly PriorityQueue<IOrderStreamMergeEnumerator<T>> _queue;
private T CurrentValue;
private List<object> CurrentGroupValues;
private bool _skipFirst;
public MultiAggregateOrderStreamMergeEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeEnumerator<T>> enumerators)
{
_mergeContext = mergeContext;
_enumerators = enumerators;
_queue = new PriorityQueue<IOrderStreamMergeEnumerator<T>>(enumerators.Count());
_skipFirst = true;
SetOrderEnumerator();
}
private void SetOrderEnumerator()
{
foreach (var source in _enumerators)
{
var orderStreamEnumerator = new OrderStreamMergeEnumerator<T>(_mergeContext, source);
if (orderStreamEnumerator.HasElement())
{
orderStreamEnumerator.SkipFirst();
_queue.Offer(orderStreamEnumerator);
}
}
//设置第一个元素聚合的属性值
CurrentGroupValues = _queue.IsEmpty() ? new List<object>(0) : GetCurrentGroupValues(_queue.Peek());
}
private List<object> GetCurrentGroupValues(IOrderStreamMergeEnumerator<T> enumerator)
{
var first = enumerator.ReallyCurrent;
return _mergeContext.SelectContext.SelectProperties.Where(o => !o.IsAggregateMethod)
.Select(o => first.GetValueByExpression(o.PropertyName)).ToList();
}
public bool MoveNext()
{
if (_queue.IsEmpty())
return false;
var hasNext = SetCurrentValue();
if (hasNext)
{
CurrentGroupValues = _queue.IsEmpty() ? new List<object>(0) : GetCurrentGroupValues(_queue.Peek());
}
return hasNext;
}
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
private bool EqualWithGroupValues()
{
var current = GetCurrentGroupValues(_queue.Peek());
for (int i = 0; i < CurrentGroupValues.Count; i++)
{
if (!CurrentGroupValues[i].Equals(current[i]))
return false;
}
return true;
}
private bool SetCurrentValue()
{
CurrentValue = default;
var currentValues = new List<T>();
while (EqualWithGroupValues())
{
var current = _queue.Peek().Current;
currentValues.Add(current);
var first = _queue.Poll();
if (first.MoveNext())
{
_queue.Offer(first);
}
if (_queue.IsEmpty())
{
break;
}
}
MergeValue(currentValues);
return true;
}
private void MergeValue(List<T> aggregateValues)
{
if (aggregateValues.IsNotEmpty())
{
CurrentValue = aggregateValues.First();
if (aggregateValues.Count > 1)
{
var aggregates = _mergeContext.SelectContext.SelectProperties.Where(o => o.IsAggregateMethod).ToList();
if (aggregates.IsNotEmpty())
{
foreach (var aggregate in aggregates)
{
object aggregateValue = null;
if (aggregate.AggregateMethod == nameof(Queryable.Count))
{
aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName);
}
else if (aggregate.AggregateMethod == nameof(Queryable.Sum))
{
aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName);
}
else if (aggregate.AggregateMethod == nameof(Queryable.Max))
{
aggregateValue = aggregateValues.AsQueryable().Max(aggregate.PropertyName);
}
else if (aggregate.AggregateMethod == nameof(Queryable.Min))
{
aggregateValue = aggregateValues.AsQueryable().Min(aggregate.PropertyName);
}
else if (aggregate.AggregateMethod == nameof(Queryable.Average))
{
aggregateValue = aggregateValues.AsQueryable().Average(aggregate.PropertyName);
}
else
{
throw new InvalidOperationException($"method:{aggregate.AggregateMethod} invalid operation ");
}
CurrentValue.SetPropertyValue(aggregate.PropertyName, aggregateValue);
}
}
}
}
}
public bool SkipFirst()
{
return true;
}
public bool HasElement()
{
return ReallyCurrent != null;
}
public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent;
public void Dispose()
{
foreach (var enumerator in _enumerators)
{
enumerator?.Dispose();
}
}
public T Current => CurrentValue;
}
}

View File

@ -0,0 +1,110 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.PriorityQueues;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:49:09
* @Email: 326308290@qq.com
*/
public class MultiOrderStreamMergeEnumerator<T> : IStreamMergeEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IEnumerable<IStreamMergeEnumerator<T>> _enumerators;
private readonly PriorityQueue<IOrderStreamMergeEnumerator<T>> _queue;
private IStreamMergeEnumerator<T> _currentEnumerator;
private bool skipFirst;
public MultiOrderStreamMergeEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeEnumerator<T>> enumerators)
{
_mergeContext = mergeContext;
_enumerators = enumerators;
_queue = new PriorityQueue<IOrderStreamMergeEnumerator<T>>(enumerators.Count());
skipFirst = true;
SetOrderEnumerator();
}
private void SetOrderEnumerator()
{
foreach (var source in _enumerators)
{
var orderStreamEnumerator = new OrderStreamMergeEnumerator<T>(_mergeContext, source);
if (orderStreamEnumerator.HasElement())
{
orderStreamEnumerator.SkipFirst();
_queue.Offer(orderStreamEnumerator);
}
}
_currentEnumerator = _queue.IsEmpty() ? _enumerators.FirstOrDefault() : _queue.Peek();
}
public bool MoveNext()
{
if (_queue.IsEmpty())
return false;
if (skipFirst)
{
skipFirst = false;
return true;
}
var first = _queue.Poll();
if (first.MoveNext())
{
_queue.Offer(first);
}
if (_queue.IsEmpty())
{
return false;
}
_currentEnumerator = _queue.Peek();
return true;
}
public void Reset()
{
throw new System.NotImplementedException();
}
object IEnumerator.Current => Current;
public bool SkipFirst()
{
if (skipFirst)
{
skipFirst = false;
return true;
}
return false;
}
public bool HasElement()
{
return _currentEnumerator != null && _currentEnumerator.HasElement();
}
public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent;
public void Dispose()
{
foreach (var enumerator in _enumerators)
{
enumerator?.Dispose();
}
}
public T Current => skipFirst ? default : _currentEnumerator.Current;
}
}

View File

@ -0,0 +1,110 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge;
using ShardingCore.Extensions;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:46:32
* @Email: 326308290@qq.com
*/
public class OrderStreamMergeEnumerator<T>:IOrderStreamMergeEnumerator<T>
{
/// <summary>
/// 合并数据上下文
/// </summary>
private readonly StreamMergeContext<T> _mergeContext;
private readonly IStreamMergeEnumerator<T> _enumerator;
private List<IComparable> _orderValues;
public OrderStreamMergeEnumerator(StreamMergeContext<T> mergeContext, IStreamMergeEnumerator<T> enumerator)
{
_mergeContext = mergeContext;
_enumerator = enumerator;
SetOrderValues();
}
private void SetOrderValues()
{
_orderValues = HasElement() ? GetCurrentOrderValues() : new List<IComparable>(0);
}
public bool MoveNext()
{
var has = _enumerator.MoveNext();
SetOrderValues();
return has;
}
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current =>_enumerator.Current;
public bool SkipFirst()
{
return _enumerator.SkipFirst();
}
public bool HasElement()
{
return _enumerator.HasElement();
}
public T ReallyCurrent => _enumerator.ReallyCurrent;
private List<IComparable> GetCurrentOrderValues()
{
if (!_mergeContext.Orders.Any())
return new List<IComparable>(0);
var list = new List<IComparable>(_mergeContext.Orders.Count());
foreach (var order in _mergeContext.Orders)
{
var value = _enumerator.ReallyCurrent.GetValueByExpression(order.PropertyExpression);
if (value is IComparable comparable)
list.Add(comparable);
else
throw new NotSupportedException($"order by value [{order}] must implements IComparable");
}
return list;
}
public int CompareTo(IOrderStreamMergeEnumerator<T> other)
{
int i = 0;
foreach (var order in _mergeContext.Orders) {
int result = CompareHelper.CompareToWith(_orderValues[i], other.GetCompares()[i], order.IsAsc);
if (0 != result) {
return result;
}
i++;
}
return 0;
}
public List<IComparable> GetCompares()
{
return _orderValues ?? new List<IComparable>(0);
}
public void Dispose()
{
_enumerator?.Dispose();
}
}
}

View File

@ -0,0 +1,85 @@
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:39:52
* @Email: 326308290@qq.com
*/
public class PaginationStreamMergeEnumerator<T> : IStreamMergeEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IStreamMergeEnumerator<T> _enumerator;
private readonly int? _skip;
private readonly int? _take;
private int realSkip = 0;
private int realTake = 0;
public PaginationStreamMergeEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_skip = mergeContext.Skip;
_take = mergeContext.Take;
if (_mergeContext.HasGroupQuery())
_enumerator = new MultiAggregateOrderStreamMergeEnumerator<T>(_mergeContext, sources);
else
_enumerator = new MultiOrderStreamMergeEnumerator<T>(_mergeContext, sources);
}
public bool MoveNext()
{
//如果合并数据的时候不需要跳过也没有take多少那么就是直接next
while (_skip.GetValueOrDefault() > this.realSkip)
{
var has = _enumerator.MoveNext();
realSkip++;
if (!has)
return false;
}
var next = _enumerator.MoveNext();
if (next)
{
if (_take.HasValue)
{
realTake++;
if (realTake >= _take.Value)
return false;
}
}
return next;
}
public void Reset()
{
throw new System.NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => _enumerator.Current;
public bool SkipFirst()
{
return _enumerator.SkipFirst();
}
public bool HasElement()
{
return _enumerator.HasElement();
}
public T ReallyCurrent => _enumerator.ReallyCurrent;
public void Dispose()
{
_enumerator?.Dispose();
}
}
}

View File

@ -0,0 +1,63 @@
using System;
using System.Collections;
using System.Collections.Generic;
namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 14 August 2021 21:25:50
* @Email: 326308290@qq.com
*/
public class StreamMergeEnumerator<T>:IStreamMergeEnumerator<T>
{
private readonly IEnumerator<T> _source;
private bool skip;
public StreamMergeEnumerator(IEnumerator<T> source)
{
_source = source;
skip = true;
}
public bool MoveNext()
{
if (skip)
{
skip = false;
return null != _source.Current;
}
return _source.MoveNext();
}
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => skip?default:_source.Current;
public bool SkipFirst()
{
if (skip)
{
skip = false;
return true;
}
return false;
}
public bool HasElement()
{
return null != _source.Current;
}
public T ReallyCurrent => _source.Current;
public void Dispose()
{
_source?.Dispose();
}
}
}

View File

@ -16,11 +16,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine<TEntity>,IEnsureAsyncMergeResult<TResult>
public abstract class AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine<TEntity>,IEnsureMergeResult<TResult>
{
protected AbstractEnsureMethodCallInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public abstract TResult MergeResult();
public abstract Task<TResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -16,13 +16,16 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractGenericMethodCallInMemoryAsyncMergeEngine<TEntity> : AbstractInMemoryAsyncMergeEngine<TEntity>, IGenericAsyncMergeResult
public abstract class AbstractGenericMethodCallInMemoryAsyncMergeEngine<TEntity> : AbstractInMemoryAsyncMergeEngine<TEntity>, IGenericMergeResult
{
protected AbstractGenericMethodCallInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public abstract TResult MergeResult<TResult>();
public abstract Task<TResult> MergeResultAsync<TResult>(
CancellationToken cancellationToken = new CancellationToken());
}

View File

@ -5,8 +5,10 @@ using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
@ -53,28 +55,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
var tableResult = GetStreamMergeContext().GetRouteResults();
var enumeratorTasks = tableResult.Select(routeResult =>
{
if (routeResult.ReplaceTables.Count > 1)
throw new ShardingCoreException("route found more than 1 table name s");
var tail = string.Empty;
if (routeResult.ReplaceTables.Count == 1)
tail = routeResult.ReplaceTables.First().Tail;
var tail = CheckAndGetTail(routeResult);
return Task.Run(async () =>
{
try
{
//using (var scope = _mergeContext.CreateScope())
//{
//var shardingContext = ShardingContext.Create(routeResult);
//scope.ShardingAccessor.ShardingContext = shardingContext;
var shardingDbContext = GetStreamMergeContext().CreateDbContext(tail);
var newQueryable = (IQueryable<TEntity>)GetStreamMergeContext().GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var newFilterQueryable=EFQueryAfterFilter<TResult>(newQueryable);
var query = await efQuery(newFilterQueryable);
return query;
//}
}
catch (Exception e)
{
@ -86,6 +78,42 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
return (await Task.WhenAll(enumeratorTasks)).ToList();
}
public List<TResult> Execute<TResult>(Func<IQueryable, TResult> efQuery, CancellationToken cancellationToken = new CancellationToken())
{
var tableResult = GetStreamMergeContext().GetRouteResults();
var enumeratorTasks = tableResult.Select(routeResult =>
{
var tail = CheckAndGetTail(routeResult);
return Task.Run( () =>
{
try
{
var shardingDbContext = GetStreamMergeContext().CreateDbContext(tail);
var newQueryable = (IQueryable<TEntity>)GetStreamMergeContext().GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var newFilterQueryable = EFQueryAfterFilter<TResult>(newQueryable);
var query = efQuery(newFilterQueryable);
return query;
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}).ToArray();
return Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult().ToList();
}
private string CheckAndGetTail(RouteResult routeResult)
{
if (routeResult.ReplaceTables.Count > 1)
throw new ShardingCoreException("route found more than 1 table name s");
var tail = string.Empty;
if (routeResult.ReplaceTables.Count == 1)
tail = routeResult.ReplaceTables.First().Tail;
return tail;
}
public virtual IQueryable EFQueryAfterFilter<TResult>(IQueryable<TEntity> queryable)
{

View File

@ -13,10 +13,19 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IEnsureAsyncMergeResult<T>
/// <summary>
/// 确认结果的合并
/// </summary>
/// <typeparam name="T">返回的确认结果类型</typeparam>
public interface IEnsureMergeResult<T>
{
/// <summary>
/// 合并结果
/// </summary>
/// <returns></returns>
T MergeResult();
/// <summary>
///
/// 合并结果
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>

View File

@ -13,11 +13,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IGenericAsyncMergeResult
/// <summary>
/// 非确认结果的合并
/// </summary>
public interface IGenericMergeResult
{
/// <summary>
///
/// 合并结果
/// </summary>
/// <typeparam name="TResult">结果类型</typeparam>
/// <returns></returns>
TResult MergeResult<TResult>();
/// <summary>
/// 合并结果
/// </summary>
/// <typeparam name="TResult">结果类型</typeparam>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken());

View File

@ -30,46 +30,158 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
}
public override TEnsureResult MergeResult()
{
if (typeof(decimal) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<decimal>)queryable).Average());
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(decimal?) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<decimal?>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
return ConvertSum(average);
}
if (typeof(int) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<int>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(int?) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<int?>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
return ConvertSum(average);
}
if (typeof(long) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<long>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(long?) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<long?>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
return ConvertSum(average);
}
if (typeof(double) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<double>)queryable).Average()
);
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(double?) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<double?>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
return ConvertSum(average);
}
if (typeof(float) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<float>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(float?) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<float?>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
return ConvertSum(average);
}
throw new ShardingCoreException(
$"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}");
}
public override async Task<TEnsureResult> MergeResultAsync(
CancellationToken cancellationToken = new CancellationToken())
{
if (typeof(decimal) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(
async queryable => await ((IQueryable<decimal>) queryable).AverageAsync(cancellationToken),
queryable => ((IQueryable<decimal>)queryable).AverageAsync(cancellationToken),
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum()/result.Count;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(decimal?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(
async queryable => await ((IQueryable<decimal?>) queryable).AverageAsync(cancellationToken),
queryable => ((IQueryable<decimal?>)queryable).AverageAsync(cancellationToken),
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ?result.Sum()/result.Count: default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
return ConvertSum(average);
}
if (typeof(int) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(
async queryable => await ((IQueryable<int>) queryable).AverageAsync(cancellationToken),
queryable => ((IQueryable<int>)queryable).AverageAsync(cancellationToken),
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum()/result.Count;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(int?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(
async queryable => await ((IQueryable<int?>) queryable).AverageAsync(cancellationToken),
queryable => ((IQueryable<int?>)queryable).AverageAsync(cancellationToken),
cancellationToken);
if (result.IsEmpty())
return default;
@ -80,18 +192,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
if (typeof(long) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(
async queryable => await ((IQueryable<long>) queryable).AverageAsync(cancellationToken),
queryable => ((IQueryable<long>)queryable).AverageAsync(cancellationToken),
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum()/result.Count;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(long?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(
async queryable => await ((IQueryable<long?>) queryable).AverageAsync(cancellationToken),
queryable => ((IQueryable<long?>)queryable).AverageAsync(cancellationToken),
cancellationToken);
if (result.IsEmpty())
return default;
@ -102,16 +214,16 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
if (typeof(double) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(
async queryable => await ((IQueryable<double>) queryable).AverageAsync(cancellationToken),
queryable => ((IQueryable<double>)queryable).AverageAsync(cancellationToken),
cancellationToken);
var average = result.Sum()/result.Count;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(double?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(
async queryable => await ((IQueryable<double?>) queryable).AverageAsync(cancellationToken),
queryable => ((IQueryable<double?>)queryable).AverageAsync(cancellationToken),
cancellationToken);
if (result.IsEmpty())
return default;
@ -122,18 +234,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
if (typeof(float) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(
async queryable => await ((IQueryable<float>) queryable).AverageAsync(cancellationToken),
queryable => ((IQueryable<float>)queryable).AverageAsync(cancellationToken),
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum()/result.Count;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(float?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(
async queryable => await ((IQueryable<float?>) queryable).AverageAsync(cancellationToken),
queryable => ((IQueryable<float?>)queryable).AverageAsync(cancellationToken),
cancellationToken);
if (result.IsEmpty())
return default;

View File

@ -25,9 +25,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
}
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).Max());
return result.Max();
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TResult>)queryable).MaxAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).MaxAsync(cancellationToken), cancellationToken);
return result.Max();
}
}

View File

@ -26,9 +26,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
}
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).Min());
return result.Min();
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TResult>)queryable).MinAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).MinAsync(cancellationToken), cancellationToken);
return result.Min();
}
}

View File

@ -28,11 +28,11 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
}
public override async Task<TEnsureResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
public override TEnsureResult MergeResult()
{
if(typeof(decimal)==typeof(TEnsureResult))
if (typeof(decimal) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<decimal>)queryable).SumAsync(cancellationToken), cancellationToken);
var result = base.Execute(queryable => ((IQueryable<decimal>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
@ -40,7 +40,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
}
if (typeof(decimal?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<decimal?>)queryable).SumAsync(cancellationToken), cancellationToken);
var result = base.Execute(queryable => ((IQueryable<decimal?>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
@ -48,7 +48,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
}
if (typeof(int) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<int>)queryable).SumAsync(cancellationToken), cancellationToken);
var result = base.Execute(queryable => ((IQueryable<int>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
@ -56,7 +56,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
}
if (typeof(int?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<int?>)queryable).SumAsync(cancellationToken), cancellationToken);
var result = base.Execute(queryable => ((IQueryable<int?>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
@ -64,7 +64,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
}
if (typeof(long) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<long>)queryable).SumAsync(cancellationToken), cancellationToken);
var result = base.Execute(queryable => ((IQueryable<long>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
@ -72,7 +72,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
}
if (typeof(long?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<long?>)queryable).SumAsync(cancellationToken), cancellationToken);
var result = base.Execute(queryable => ((IQueryable<long?>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
@ -80,7 +80,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
}
if (typeof(double) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<double>)queryable).SumAsync(cancellationToken), cancellationToken);
var result = base.Execute(queryable => ((IQueryable<double>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
@ -88,7 +88,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
}
if (typeof(double?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<double?>)queryable).SumAsync(cancellationToken), cancellationToken);
var result = base.Execute(queryable => ((IQueryable<double?>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
@ -96,7 +96,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
}
if (typeof(float) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<float>)queryable).SumAsync(cancellationToken), cancellationToken);
var result = base.Execute(queryable => ((IQueryable<float>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
@ -104,7 +104,94 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
}
if (typeof(float?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<float?>)queryable).SumAsync(cancellationToken), cancellationToken);
var result = base.Execute(queryable => ((IQueryable<float?>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
throw new ShardingCoreException(
$"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}");
}
public override async Task<TEnsureResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
if (typeof(decimal) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(queryable => ((IQueryable<decimal>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(decimal?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(queryable => ((IQueryable<decimal?>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(int) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(queryable => ((IQueryable<int>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(int?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(queryable => ((IQueryable<int?>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(long) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(queryable => ((IQueryable<long>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(long?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(queryable => ((IQueryable<long?>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(double) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(queryable => ((IQueryable<double>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(double?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(queryable => ((IQueryable<double?>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(float) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(queryable => ((IQueryable<float>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(float?) == typeof(TEnsureResult))
{
var result = await base.ExecuteAsync(queryable => ((IQueryable<float?>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();

View File

@ -27,10 +27,17 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override bool MergeResult()
{
var result = base.Execute( queryable => ((IQueryable<TEntity>)queryable).Any());
return result.All(o => o);
}
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TEntity>)queryable).AnyAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).AnyAsync(cancellationToken), cancellationToken);
return result.All(o => o);
}

View File

@ -26,9 +26,17 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public AnyAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public override bool MergeResult()
{
var result = base.Execute( queryable => ((IQueryable<TEntity>)queryable).Any());
return result.Any(o => o);
}
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TEntity>)queryable).AnyAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).AnyAsync(cancellationToken), cancellationToken);
return result.Any(o => o);
}

View File

@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@ -8,6 +9,7 @@ using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.Enumerators.StreamMergeSync;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -17,7 +19,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Date: Saturday, 14 August 2021 22:07:28
* @Email: 326308290@qq.com
*/
public class AsyncEnumerableStreamMergeEngine<T> :IAsyncEnumerable<T>
public class AsyncEnumerableStreamMergeEngine<T> :IAsyncEnumerable<T>,IEnumerable<T>
{
private readonly StreamMergeContext<T> _mergeContext;
@ -131,5 +133,60 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return new MultiAggregateOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
}
private IEnumerator<T> GetEnumerator(IQueryable<T> newQueryable)
{
var enumator = newQueryable.AsEnumerable().GetEnumerator();
enumator.MoveNext();
return enumator;
}
public IEnumerator<T> GetEnumerator()
{
var tableResult = _mergeContext.GetRouteResults();
var enumeratorTasks = tableResult.Select(routeResult =>
{
if (routeResult.ReplaceTables.Count > 1)
throw new ShardingCoreException("route found more than 1 table name s");
var tail = string.Empty;
if (routeResult.ReplaceTables.Count == 1)
tail = routeResult.ReplaceTables.First().Tail;
return Task.Run( () =>
{
try
{
//using (var scope = _mergeContext.CreateScope())
//{
//var shardingContext = ShardingContext.Create(routeResult);
//scope.ShardingAccessor.ShardingContext = shardingContext;
var shardingDbContext = _mergeContext.CreateDbContext(tail);
var newQueryable = (IQueryable<T>)_mergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var enumerator = GetEnumerator(newQueryable);
return new StreamMergeEnumerator<T>(enumerator);
//}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
if (_mergeContext.HasSkipTake())
return new PaginationStreamMergeEnumerator<T>(_mergeContext, streamEnumerators);
if (_mergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeEnumerator<T>(_mergeContext, streamEnumerators);
return new MultiOrderStreamMergeEnumerator<T>(_mergeContext, streamEnumerators);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
}

View File

@ -23,9 +23,16 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override bool MergeResult()
{
var result = base.Execute( queryable => ((IQueryable<TEntity>)queryable).Contains(GetConstantItem()));
return result.Any(o => o);
}
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TEntity>)queryable).ContainsAsync(GetConstantItem(), cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).ContainsAsync(GetConstantItem(), cancellationToken), cancellationToken);
return result.Any(o => o);
}

View File

@ -21,9 +21,17 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public CountAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public override int MergeResult()
{
var result = base.Execute( queryable => ((IQueryable<TEntity>)queryable).Count());
return result.Sum();
}
public override async Task<int> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TEntity>)queryable).CountAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).CountAsync(cancellationToken), cancellationToken);
return result.Sum();
}

View File

@ -26,9 +26,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override TResult MergeResult<TResult>()
{
var result = base.Execute(queryable => ((IQueryable<TResult>)queryable).First());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).First();
return q.First();
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TResult>)queryable).FirstAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).FirstAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();

View File

@ -29,9 +29,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).FirstOrDefault());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).FirstOrDefault();
return q.FirstOrDefault();
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TResult>)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();

View File

@ -26,10 +26,22 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).Last());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).Last();
return q.Last();
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TResult>)queryable).LastAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).LastAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();

View File

@ -26,9 +26,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).LastOrDefault());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).LastOrDefault();
return q.LastOrDefault();
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TResult>)queryable).LastOrDefaultAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).LastOrDefaultAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();

View File

@ -28,10 +28,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override long MergeResult()
{
var result = base.Execute( queryable => ((IQueryable<TEntity>)queryable).LongCount());
return result.Sum();
}
public override async Task<long> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TEntity>)queryable).LongCountAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).LongCountAsync(cancellationToken), cancellationToken);
return result.Sum();
}

View File

@ -26,9 +26,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).Single());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).Single();
return q.Single();
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TResult>)queryable).SingleAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).SingleAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();

View File

@ -26,9 +26,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override TResult MergeResult<TResult>()
{
var result = base.Execute(queryable => ((IQueryable<TResult>)queryable).SingleOrDefault());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).SingleOrDefault();
return q.SingleOrDefault();
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TResult>)queryable).SingleOrDefaultAsync(cancellationToken), cancellationToken);
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).SingleOrDefaultAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();

View File

@ -18,8 +18,4 @@
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="5.0.7" />
<PackageReference Include="Microsoft.Extensions.DependencyModel" Version="5.0.0" />
</ItemGroup>
<ItemGroup>
<Folder Include="Sharding\Enumerators\StreamMeregeSync" />
</ItemGroup>
</Project>