修复重构后的bug 后需要实现group by

This commit is contained in:
xuejiaming 2022-08-07 23:16:16 +08:00
parent 52ed6e0c49
commit a1cc753cd7
26 changed files with 265 additions and 226 deletions

View File

@ -8,6 +8,7 @@ using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
using ShardingCore.Sharding.MergeEngines.Enumerables.Base;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Enumerables;
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines;
@ -95,7 +96,7 @@ namespace ShardingCore.Sharding.MergeEngines.Enumerables
protected override IExecutor<IStreamMergeAsyncEnumerator<TEntity>> CreateExecutor(bool async)
{
throw new System.NotImplementedException();
return new AppendOrderSequenceEnumerableExecutor<TEntity>(GetStreamMergeContext(), async);
}
}
}

View File

@ -92,13 +92,6 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Abstractions
//严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext
if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
{
var resultCount = result.Count;
if (resultCount > 1)
{
throw new ShardingCoreInvalidOperationException(
$"in memory merge result length error:{resultCount}");
}
GetShardingMerger()
.InMemoryMerge(result, routeQueryResults.Select(o => o.MergeResult).ToList());
// MergeParallelExecuteResult(result, , async);

View File

@ -55,9 +55,9 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
_afterTerminated?.Invoke();
}
public void Register(Action afterTrip)
public void Register(Action afterTerminated)
{
_afterTerminated = afterTrip;
_afterTerminated = afterTerminated;
}
}
}

View File

@ -13,13 +13,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
protected override bool OrderConditionTerminated<TResult>(IEnumerable<TResult> results)
{
//只要有一个是false就拉闸
return results.Any(o => o is RouteQueryResult<bool> routeQueryResult && routeQueryResult.QueryResult==false);
return results.Any(o => o is false);
}
protected override bool RandomConditionTerminated<TResult>(IEnumerable<TResult> results)
{
//只要有一个是false就拉闸
return results.Any(o => o is RouteQueryResult<bool> routeQueryResult && routeQueryResult.QueryResult == false);
return results.Any(o => o is false);
}
}
}

View File

@ -11,12 +11,12 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
}
protected override bool OrderConditionTerminated<TResult>(IEnumerable<TResult> results)
{
return results.Any(o => o is RouteQueryResult<bool> routeQueryResult && routeQueryResult.QueryResult);
return results.Any(o => o is true);
}
protected override bool RandomConditionTerminated<TResult>(IEnumerable<TResult> results)
{
return results.Any(o => o is RouteQueryResult<bool> routeQueryResult && routeQueryResult.QueryResult);
return results.Any(o => o is true);
}
}
}

View File

@ -12,12 +12,12 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
protected override bool OrderConditionTerminated<TResult>(IEnumerable<TResult> results)
{
return results.Any(o => o is RouteQueryResult<bool> routeQueryResult && routeQueryResult.QueryResult);
return results.Any(o => o is true);
}
protected override bool RandomConditionTerminated<TResult>(IEnumerable<TResult> results)
{
return results.Any(o => o is RouteQueryResult<bool> routeQueryResult && routeQueryResult.QueryResult);
return results.Any(o => o is true);
}
}
}

View File

@ -1,27 +0,0 @@
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
{
internal class SingleOrSingleOrDefaultCircuitBreaker : AbstractCircuitBreaker
{
public SingleOrSingleOrDefaultCircuitBreaker(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
}
protected override bool OrderConditionTerminated<TResult>(IEnumerable<TResult> results)
{
return results
.Where(o => o is IRouteQueryResult routeQueryResult && routeQueryResult.HasQueryResult())
.Take(2).Count() > 1;
}
protected override bool RandomConditionTerminated<TResult>(IEnumerable<TResult> results)
{
return results
.Where(o => o is IRouteQueryResult routeQueryResult && routeQueryResult.HasQueryResult())
.Take(2).Count() > 1;
}
}
}

View File

@ -30,7 +30,12 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
public override ICircuitBreaker CreateCircuitBreaker()
{
return new AnyElementCircuitBreaker(GetStreamMergeContext());
var anyCircuitBreaker = new AnyCircuitBreaker(GetStreamMergeContext());
anyCircuitBreaker.Register(() =>
{
Cancel();
});
return anyCircuitBreaker;
}
public override IShardingMerger<bool> GetShardingMerger()

View File

@ -33,7 +33,12 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
public override ICircuitBreaker CreateCircuitBreaker()
{
return new NoTripCircuitBreaker(GetStreamMergeContext());
var circuitBreaker = new NoTripCircuitBreaker(GetStreamMergeContext());
circuitBreaker.Register(() =>
{
Cancel();
});
return circuitBreaker;
}
public override IShardingMerger<RouteQueryResult<AverageResult<TSelect>>> GetShardingMerger()

View File

@ -32,7 +32,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
public override ICircuitBreaker CreateCircuitBreaker()
{
return new AnyElementCircuitBreaker(GetStreamMergeContext());
var circuitBreaker = new AnyElementCircuitBreaker(GetStreamMergeContext());
circuitBreaker.Register(() =>
{
Cancel();
});
return circuitBreaker;
}
public override IShardingMerger<RouteQueryResult<TResult>> GetShardingMerger()

View File

@ -31,7 +31,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
public override ICircuitBreaker CreateCircuitBreaker()
{
return new AnyElementCircuitBreaker(GetStreamMergeContext());
var circuitBreaker = new AnyElementCircuitBreaker(GetStreamMergeContext());
circuitBreaker.Register(() =>
{
Cancel();
});
return circuitBreaker;
}
public override IShardingMerger<RouteQueryResult<TResult>> GetShardingMerger()

View File

@ -24,18 +24,23 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
/// Author: xjm
/// Created: 2022/5/7 11:13:57
/// Email: 326308290@qq.com
internal class SumMethodWrapExecutor<TEntity> : AbstractMethodExecutor<TEntity>
internal class SumMethodExecutor<TEntity> : AbstractMethodWrapExecutor<TEntity>
{
public SumMethodWrapExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
public SumMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
}
public override ICircuitBreaker CreateCircuitBreaker()
{
return new NoTripCircuitBreaker(GetStreamMergeContext());
var circuitBreaker = new NoTripCircuitBreaker(GetStreamMergeContext());
circuitBreaker.Register(() =>
{
Cancel();
});
return circuitBreaker;
}
public override IShardingMerger<TEntity> GetShardingMerger()
public override IShardingMerger<RouteQueryResult<TEntity>> GetShardingMerger()
{
return new SumMethodShardingMerger<TEntity>();
}

View File

@ -7,7 +7,7 @@ using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
{
internal abstract class AbstractEnumerableShardingMerger<TEntity>:IShardingMerger<IStreamMergeAsyncEnumerator<TEntity>>
internal abstract class AbstractEnumerableShardingMerger<TEntity> : IShardingMerger<IStreamMergeAsyncEnumerator<TEntity>>
{
private readonly StreamMergeContext _streamMergeContext;
private readonly bool _async;
@ -16,7 +16,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
{
return _streamMergeContext;
}
public AbstractEnumerableShardingMerger(StreamMergeContext streamMergeContext,bool async)
public AbstractEnumerableShardingMerger(StreamMergeContext streamMergeContext, bool async)
{
_streamMergeContext = streamMergeContext;
_async = async;
@ -30,6 +30,11 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(_streamMergeContext, parallelResults);
}
protected virtual IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
{
return StreamMerge(parallelResults);
}
public virtual void InMemoryMerge(List<IStreamMergeAsyncEnumerator<TEntity>> beforeInMemoryResults, List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
{
@ -42,8 +47,8 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
var parallelCount = parallelResults.Count;
if (parallelCount == 0)
return;
//聚合
if (parallelResults is IEnumerable<IStreamMergeAsyncEnumerator<TEntity>> parallelStreamEnumeratorResults)
{
@ -52,26 +57,26 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
{
mergeAsyncEnumerators.Add(beforeInMemoryResults.First());
}
foreach (var parallelStreamEnumeratorResult in parallelStreamEnumeratorResults)
{
mergeAsyncEnumerators.Add(parallelStreamEnumeratorResult);
}
var combineStreamMergeAsyncEnumerator =StreamMerge(mergeAsyncEnumerators);
var combineStreamMergeAsyncEnumerator = StreamInMemoryMerge(mergeAsyncEnumerators);
// var streamMergeContext = GetStreamMergeContext();
// IStreamMergeAsyncEnumerator<TResult> inMemoryStreamMergeAsyncEnumerator =streamMergeContext.HasGroupQuery()&&streamMergeContext.GroupQueryMemoryMerge()?
// new InMemoryGroupByOrderStreamMergeAsyncEnumerator<TResult>(streamMergeContext,combineStreamMergeAsyncEnumerator,async):
// new InMemoryStreamMergeAsyncEnumerator<TResult>(combineStreamMergeAsyncEnumerator, async);
var inMemoryStreamMergeAsyncEnumerator= new InMemoryStreamMergeAsyncEnumerator<TEntity>(combineStreamMergeAsyncEnumerator, _async);
var inMemoryStreamMergeAsyncEnumerator = new InMemoryStreamMergeAsyncEnumerator<TEntity>(combineStreamMergeAsyncEnumerator, _async);
beforeInMemoryResults.Clear();
beforeInMemoryResults.Add(inMemoryStreamMergeAsyncEnumerator);
//合并
return;
}
throw new ShardingCoreInvalidOperationException(
$"{typeof(TEntity)} is not {typeof(IStreamMergeAsyncEnumerator<TEntity>)}");
}
}
}
}

View File

@ -11,7 +11,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
static AnyMethodShardingMerger()
{
_allShardingMerger = new AllMethodShardingMerger();
_allShardingMerger = new AnyMethodShardingMerger();
}
public static IShardingMerger<bool> Instance => _allShardingMerger;

View File

@ -11,7 +11,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
static ContainsMethodShardingMerger()
{
_allShardingMerger = new AllMethodShardingMerger();
_allShardingMerger = new ContainsMethodShardingMerger();
}
public static IShardingMerger<bool> Instance => _allShardingMerger;

View File

@ -28,7 +28,9 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
return new RouteQueryResult<int>(null,null,r,true);
}
return new RouteQueryResult<int>(null,null,parallelResults.Sum(o => o.QueryResult),true);
var sum = parallelResults.Sum(o => o.QueryResult);
return new RouteQueryResult<int>(null,null, sum, true);
}
public void InMemoryMerge(List<RouteQueryResult<int>> beforeInMemoryResults, List<RouteQueryResult<int>> parallelResults)

View File

@ -1,5 +1,6 @@
using System.Collections.Generic;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
{
@ -8,5 +9,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
public DefaultEnumerableShardingMerger(StreamMergeContext streamMergeContext,bool async) : base(streamMergeContext,async)
{
}
protected override IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
{
if (GetStreamMergeContext().IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目
return base.StreamInMemoryMerge(parallelResults);
}
}
}

View File

@ -11,13 +11,12 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
{
}
public override IStreamMergeAsyncEnumerator<TEntity> StreamMerge(
List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
protected override IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
{
if (GetStreamMergeContext().IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults, 0,
GetStreamMergeContext().GetPaginationReWriteTake()); //内存聚合分页不可以直接获取skip必须获取skip+take的数目
return base.StreamMerge(parallelResults);
return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目
return base.StreamInMemoryMerge(parallelResults);
}
}
}

View File

@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Extensions;
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines;
@ -9,38 +11,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
{
public RouteQueryResult<TResult> StreamMerge(List<RouteQueryResult<TResult>> parallelResults)
{
var result = parallelResults.Where(o => o.HasQueryResult()).Max(o => o.QueryResult);
return new RouteQueryResult<TResult>(null, null, result);
// var resultType = typeof(TEntity);
// if (!resultType.IsNullableType())
// {
// var minTResult = GetMinTResult(parallelResults);
// return new RouteQueryResult<TResult>(null, null, minTResult);
// }
// else
// {
// var result = parallelResults.Where(o => o.HasQueryResult()).Min(o => o.QueryResult);
// return new RouteQueryResult<TResult>(null, null, result);
// }
var routeQueryResults = parallelResults.Where(o => o.HasQueryResult()).ToList();
if (routeQueryResults.IsEmpty())
throw new InvalidOperationException("Sequence contains no elements.");
var min = routeQueryResults.Max(o => o.QueryResult);
return new RouteQueryResult<TResult>(null, null, min);
}
// private TResult GetMinTResult(List<RouteQueryResult<TResult>> source)
// {
// var routeQueryResults = source.Where(o => o.HasQueryResult()).ToList();
// if (routeQueryResults.IsEmpty())
// throw new InvalidOperationException("Sequence contains no elements.");
// var min = routeQueryResults.Min(o => o.QueryResult);
//
// return ConvertNumber<TResult>(min);
// }
//
// private TResult ConvertNumber<TNumber>(TNumber number)
// {
// if (number == null)
// return default;
// var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult));
// return Expression.Lambda<Func<TResult>>(convertExpr).Compile()();
// }
public void InMemoryMerge(List<RouteQueryResult<TResult>> beforeInMemoryResults,
List<RouteQueryResult<TResult>> parallelResults)
{

View File

@ -13,81 +13,17 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
{
public RouteQueryResult<TResult> StreamMerge(List<RouteQueryResult<TResult>> parallelResults)
{
var result = parallelResults.Where(o => o.HasQueryResult()).Min(o => o.QueryResult);
return new RouteQueryResult<TResult>(null, null, result);
// var resultType = typeof(TEntity);
// if (!resultType.IsNullableType())
// {
// var minTResult = GetMinTResult(parallelResults);
// return new RouteQueryResult<TResult>(null, null, minTResult);
// }
// else
// {
// var result = parallelResults.Where(o => o.HasQueryResult()).Min(o => o.QueryResult);
// return new RouteQueryResult<TResult>(null, null, result);
// }
}
// private TResult GetMinTResult(List<RouteQueryResult<TResult>> source)
// {
// var routeQueryResults = source.Where(o => o.HasQueryResult()).ToList();
// if (routeQueryResults.IsEmpty())
// throw new InvalidOperationException("Sequence contains no elements.");
// var min = routeQueryResults.Min(o => o.QueryResult);
//
// return ConvertNumber<TResult>(min);
// }
//
// private TResult ConvertNumber<TNumber>(TNumber number)
// {
// if (number == null)
// return default;
// var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult));
// return Expression.Lambda<Func<TResult>>(convertExpr).Compile()();
// }
var routeQueryResults = parallelResults.Where(o => o.HasQueryResult()).ToList();
if (routeQueryResults.IsEmpty())
throw new InvalidOperationException("Sequence contains no elements.");
var min = routeQueryResults.Min(o => o.QueryResult);
return new RouteQueryResult<TResult>(null, null, min);
}
public void InMemoryMerge(List<RouteQueryResult<TResult>> beforeInMemoryResults,
List<RouteQueryResult<TResult>> parallelResults)
{
beforeInMemoryResults.AddRange(parallelResults);
}
}
}
//
// var resultType = typeof(TEntity);
// if (!resultType.IsNullableType())
// {
// if (typeof(decimal) == resultType)
// {
// var result = await base.ExecuteAsync<decimal?>(cancellationToken);
// return GetMinTResult<decimal?>(result);
// }
// if (typeof(float) == resultType)
// {
// var result = await base.ExecuteAsync<float?>(cancellationToken);
// return GetMinTResult<float?>(result);
// }
// if (typeof(int) == resultType)
// {
// var result = await base.ExecuteAsync<int?>(cancellationToken);
// return GetMinTResult<int?>(result);
// }
// if (typeof(long) == resultType)
// {
// var result = await base.ExecuteAsync<long?>(cancellationToken);
// return GetMinTResult<long?>(result);
// }
// if (typeof(double) == resultType)
// {
// var result = await base.ExecuteAsync<double?>(cancellationToken);
// return GetMinTResult<double?>(result);
// }
//
// throw new ShardingCoreException($"cant calc min value, type:[{resultType}]");
// }
// else
// {
// var result = await base.ExecuteAsync<TResult>(cancellationToken);
// return result.Where(o => o.HasQueryResult()).Min(o => o.QueryResult);
// }
}

View File

@ -11,7 +11,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
{
}
public override IStreamMergeAsyncEnumerator<TEntity> StreamMerge(
protected override IStreamMergeAsyncEnumerator<TEntity> StreamInMemoryMerge(
List<IStreamMergeAsyncEnumerator<TEntity>> parallelResults)
{
if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery())
@ -27,7 +27,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
if (GetStreamMergeContext().IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(GetStreamMergeContext(), parallelResults, 0,
GetStreamMergeContext().GetPaginationReWriteTake());
return base.StreamMerge(parallelResults);
return base.StreamInMemoryMerge(parallelResults);
}
}
}

View File

@ -5,16 +5,17 @@ using System.Linq.Expressions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators.AggregateExtensions;
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
{
internal class SumMethodShardingMerger<TEntity> : IShardingMerger<TEntity>
internal class SumMethodShardingMerger<TEntity> : IShardingMerger<RouteQueryResult<TEntity>>
{
private TEntity GetSumResult<TInnerSelect>(List<TInnerSelect> source)
{
if (source.IsEmpty())
return default;
var sum = source.AsQueryable().SumByConstant<TInnerSelect>();
var sum = source.AsQueryable().SumByPropertyName(nameof(RouteQueryResult<TEntity>.QueryResult));
return ConvertSum(sum);
}
@ -30,12 +31,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers
// {
// return GetSumResult(resultList);
// }
public TEntity StreamMerge(List<TEntity> parallelResults)
public RouteQueryResult<TEntity> StreamMerge(List<RouteQueryResult<TEntity>> parallelResults)
{
return GetSumResult(parallelResults);
var sumResult = GetSumResult(parallelResults);
return new RouteQueryResult<TEntity>(null, null, sumResult, true);
}
public void InMemoryMerge(List<TEntity> beforeInMemoryResults, List<TEntity> parallelResults)
public void InMemoryMerge(List<RouteQueryResult<TEntity>> beforeInMemoryResults, List<RouteQueryResult<TEntity>> parallelResults)
{
beforeInMemoryResults.AddRange(parallelResults);
}

View File

@ -32,14 +32,5 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.I
var result =await ShardingExecutor.Instance.ExecuteAsync<RouteQueryResult<TResult>>(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken);
return result.QueryResult;
}
//
// protected abstract TResult DoMergeResult(List<RouteQueryResult<TResult>> resultList);
//
// protected override IExecutor<TR> CreateExecutor<TR>(bool async)
// {
// return CreateExecutor0(async) as IExecutor<TR>;
// }
//
// protected abstract IExecutor<RouteQueryResult<TResult>> CreateExecutor0(bool async);
}
}

View File

@ -10,6 +10,8 @@ using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ShardingExecutors;
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.InMemoryMerge;
using ShardingCore.Sharding.StreamMergeEngines;
@ -22,43 +24,111 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class MaxAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractMethodEnsureWrapMergeEngine<TResult>
internal class MaxAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractBaseMergeEngine, IEnsureMergeResult<TResult>
{
public MaxAsyncInMemoryMergeEngine(StreamMergeContext streamStreamMergeContext) : base(streamStreamMergeContext)
public MaxAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
}
protected override IExecutor<RouteQueryResult<TResult>> CreateExecutor()
public TResult MergeResult()
{
return MergeResultAsync().WaitAndUnwrapException(false);
}
public async Task<TResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var resultType = typeof(TEntity);
if (!resultType.IsNullableType())
{
if (typeof(decimal) == resultType)
{
var result = await ExecuteAsync<decimal?>(cancellationToken);
return ConvertNumber(result);
}
if (typeof(float) == resultType)
{
var result = await ExecuteAsync<float?>(cancellationToken);
return ConvertNumber(result);
}
if (typeof(int) == resultType)
{
var result = await ExecuteAsync<int?>(cancellationToken);
return ConvertNumber(result);
}
if (typeof(long) == resultType)
{
var result = await ExecuteAsync<long?>(cancellationToken);
return ConvertNumber(result);
}
if (typeof(double) == resultType)
{
var result = await ExecuteAsync<double?>(cancellationToken);
return ConvertNumber(result);
}
throw new ShardingCoreException($"cant calc min value, type:[{resultType}]");
}
else
{
var result = await ExecuteAsync<TResult>(cancellationToken);
return result;
}
}
private TResult ConvertNumber<TNumber>(TNumber number)
{
if (number == null)
return default;
var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult));
return Expression.Lambda<Func<TResult>>(convertExpr).Compile()();
}
private async Task<TR> ExecuteAsync<TR>(CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
if (!GetStreamMergeContext().TryPrepareExecuteContinueQuery(() => default(TR), out var tr))
{
return tr;
}
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var executor = CreateExecutor<TR>();
var result = await ShardingExecutor.Instance.ExecuteAsync<RouteQueryResult<TR>>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken);
return result.QueryResult;
}
protected IExecutor<RouteQueryResult<TR>> CreateExecutor<TR>()
{
var resultType = typeof(TEntity);
if (!resultType.IsNullableType())
{
if (typeof(decimal) == resultType)
{
return new MaxMethodExecutor<TEntity,decimal?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
return new MaxMethodExecutor<TEntity,decimal?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
if (typeof(float) == resultType)
{
return new MaxMethodExecutor<TEntity, float?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
return new MaxMethodExecutor<TEntity, float?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
if (typeof(int) == resultType)
{
return new MaxMethodExecutor<TEntity, int?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
return new MaxMethodExecutor<TEntity, int?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
if (typeof(long) == resultType)
{
return new MaxMethodExecutor<TEntity, long?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
return new MaxMethodExecutor<TEntity, long?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
if (typeof(double) == resultType)
{
return new MaxMethodExecutor<TEntity, double?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
return new MaxMethodExecutor<TEntity, double?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
throw new ShardingCoreException($"cant calc max value, type:[{resultType}]");
}
else
{
return new MaxMethodExecutor<TEntity,TEntity>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
return new MaxMethodExecutor<TEntity,TEntity>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
}
}

View File

@ -10,6 +10,8 @@ using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ShardingExecutors;
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.InMemoryMerge;
using ShardingCore.Sharding.StreamMergeEngines;
@ -22,43 +24,112 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class MinAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractMethodEnsureWrapMergeEngine<TResult>
internal class MinAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractBaseMergeEngine, IEnsureMergeResult<TResult>
{
public MinAsyncInMemoryMergeEngine(StreamMergeContext streamStreamMergeContext) : base(streamStreamMergeContext)
public MinAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
}
protected override IExecutor<RouteQueryResult<TResult>> CreateExecutor()
public TResult MergeResult()
{
return MergeResultAsync().WaitAndUnwrapException(false);
}
public async Task<TResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var resultType = typeof(TEntity);
if (!resultType.IsNullableType())
{
if (typeof(decimal) == resultType)
{
return new MinMethodExecutor<TEntity,decimal?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
var result = await ExecuteAsync<decimal?>(cancellationToken);
return ConvertNumber(result);
}
if (typeof(float) == resultType)
{
return new MinMethodExecutor<TEntity, float?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
var result = await ExecuteAsync<float?>(cancellationToken);
return ConvertNumber(result);
}
if (typeof(int) == resultType)
{
return new MinMethodExecutor<TEntity, int?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
var result = await ExecuteAsync<int?>(cancellationToken);
return ConvertNumber(result);
}
if (typeof(long) == resultType)
{
return new MinMethodExecutor<TEntity, long?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
var result = await ExecuteAsync<long?>(cancellationToken);
return ConvertNumber(result);
}
if (typeof(double) == resultType)
{
return new MinMethodExecutor<TEntity, double?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
var result = await ExecuteAsync<double?>(cancellationToken);
return ConvertNumber(result);
}
throw new ShardingCoreException($"cant calc min value, type:[{resultType}]");
}
else
{
return new MinMethodExecutor<TEntity, TEntity>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TResult>>;
var result = await ExecuteAsync<TResult>(cancellationToken);
return result;
}
}
private TResult ConvertNumber<TNumber>(TNumber number)
{
if (number == null)
return default;
var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult));
return Expression.Lambda<Func<TResult>>(convertExpr).Compile()();
}
private async Task<TR> ExecuteAsync<TR>(CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
if (!GetStreamMergeContext().TryPrepareExecuteContinueQuery(() => default(TR), out var tr))
{
return tr;
}
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var executor = CreateExecutor<TR>();
var result = await ShardingExecutor.Instance.ExecuteAsync<RouteQueryResult<TR>>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken);
return result.QueryResult;
}
protected IExecutor<RouteQueryResult<TR>> CreateExecutor<TR>()
{
var resultType = typeof(TEntity);
if (!resultType.IsNullableType())
{
if (typeof(decimal) == resultType)
{
return new MinMethodExecutor<TEntity,decimal?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
if (typeof(float) == resultType)
{
return new MinMethodExecutor<TEntity, float?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
if (typeof(int) == resultType)
{
return new MinMethodExecutor<TEntity, int?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
if (typeof(long) == resultType)
{
return new MinMethodExecutor<TEntity, long?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
if (typeof(double) == resultType)
{
return new MinMethodExecutor<TEntity, double?>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
throw new ShardingCoreException($"cant calc min value, type:[{resultType}]");
}
else
{
return new MinMethodExecutor<TEntity, TEntity>(GetStreamMergeContext()) as IExecutor<RouteQueryResult<TR>>;
}
}
}

View File

@ -1,6 +1,7 @@
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.InMemoryMerge;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
{
@ -11,34 +12,16 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class SumAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractMethodEnsureMergeEngine<TResult>
internal class SumAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractMethodEnsureWrapMergeEngine<TResult>
{
public SumAsyncInMemoryMergeEngine(StreamMergeContext streamStreamMergeContext) : base(streamStreamMergeContext)
{
}
// private TResult GetSumResult<TInnerSelect>(List<RouteQueryResult<TInnerSelect>> source)
// {
// if (source.IsEmpty())
// return default;
// var sum = source.AsQueryable().SumByPropertyName<TInnerSelect>(nameof(RouteQueryResult<TInnerSelect>.QueryResult));
// return ConvertSum(sum);
// }
// private TResult ConvertSum<TNumber>(TNumber number)
// {
// if (number == null)
// return default;
// var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult));
// return Expression.Lambda<Func<TResult>>(convertExpr).Compile()();
// }
// protected override TResult DoMergeResult(List<RouteQueryResult<TResult>> resultList)
// {
// return GetSumResult(resultList);
// }
protected override IExecutor<TResult> CreateExecutor()
protected override IExecutor<RouteQueryResult<TResult>> CreateExecutor()
{
return new SumMethodWrapExecutor<TResult>(GetStreamMergeContext());
return new SumMethodExecutor<TResult>(GetStreamMergeContext());
}
}
}