From a1cc753cd7d074049c20f40efe989d2b1c6f0c6d Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Sun, 7 Aug 2022 23:16:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=87=8D=E6=9E=84=E5=90=8E?= =?UTF-8?q?=E7=9A=84bug=20=E5=90=8E=E9=9C=80=E8=A6=81=E5=AE=9E=E7=8E=B0gro?= =?UTF-8?q?up=20by?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AppendOrderSequenceShardingEnumerable.cs | 3 +- .../Abstractions/AbstractExecutor.cs | 7 -- .../CircuitBreakers/AbstractCircuitBreaker.cs | 4 +- .../CircuitBreakers/AllCircuitBreaker.cs | 4 +- .../CircuitBreakers/AnyCircuitBreaker.cs | 4 +- .../CircuitBreakers/ContainsCircuitBreaker.cs | 4 +- .../SingleOrSingleOrDefaultCircuitBreaker.cs | 27 ------ .../Executors/Methods/AnyMethodExecutor.cs | 7 +- .../Methods/AverageMethodWrapExecutor.cs | 7 +- .../Executors/Methods/MaxMethodExecutor.cs | 8 +- .../Executors/Methods/MinMethodExecutor.cs | 8 +- ...odWrapExecutor.cs => SumMethodExecutor.cs} | 13 ++- .../AbstractEnumerableShardingMerger.cs | 25 +++-- .../AnyMethodShardingMerger.cs | 2 +- .../ContainsMethodShardingMerger.cs | 2 +- .../CountMethodShardingMerger.cs | 4 +- .../DefaultEnumerableShardingMerger.cs | 9 ++ .../LastOrDefaultEnumerableShardingMerger.cs | 9 +- .../MaxMethodShardingMerger.cs | 37 ++------ .../MinMethodShardingMerger.cs | 78 ++-------------- .../ReverseEnumerableShardingMerger.cs | 4 +- .../SumMethodShardingMerger.cs | 12 ++- .../AbstractMethodEnsureWrapMergeEngine.cs | 9 -- .../MaxAsyncInMemoryMergeEngine.cs | 88 ++++++++++++++++-- .../MinAsyncInMemoryMergeEngine.cs | 91 +++++++++++++++++-- .../SumAsyncInMemoryMergeEngine.cs | 25 +---- 26 files changed, 265 insertions(+), 226 deletions(-) delete mode 100644 src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/SingleOrSingleOrDefaultCircuitBreaker.cs rename src/ShardingCore/Sharding/MergeEngines/Executors/Methods/{SumMethodWrapExecutor.cs => SumMethodExecutor.cs} (80%) diff --git a/src/ShardingCore/Sharding/MergeEngines/Enumerables/AppendOrderSequenceShardingEnumerable.cs b/src/ShardingCore/Sharding/MergeEngines/Enumerables/AppendOrderSequenceShardingEnumerable.cs index a0378941..22670c1e 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Enumerables/AppendOrderSequenceShardingEnumerable.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Enumerables/AppendOrderSequenceShardingEnumerable.cs @@ -8,6 +8,7 @@ using ShardingCore.Sharding.MergeEngines.Common; using ShardingCore.Sharding.MergeEngines.Common.Abstractions; using ShardingCore.Sharding.MergeEngines.Enumerables.Base; using ShardingCore.Sharding.MergeEngines.Executors.Abstractions; +using ShardingCore.Sharding.MergeEngines.Executors.Enumerables; using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.StreamMerge; using ShardingCore.Sharding.PaginationConfigurations; using ShardingCore.Sharding.StreamMergeEngines; @@ -95,7 +96,7 @@ namespace ShardingCore.Sharding.MergeEngines.Enumerables protected override IExecutor> CreateExecutor(bool async) { - throw new System.NotImplementedException(); + return new AppendOrderSequenceEnumerableExecutor(GetStreamMergeContext(), async); } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/Abstractions/AbstractExecutor.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/Abstractions/AbstractExecutor.cs index e1445f9b..d5b80734 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/Abstractions/AbstractExecutor.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/Abstractions/AbstractExecutor.cs @@ -92,13 +92,6 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Abstractions //严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY) { - var resultCount = result.Count; - if (resultCount > 1) - { - throw new ShardingCoreInvalidOperationException( - $"in memory merge result length error:{resultCount}"); - } - GetShardingMerger() .InMemoryMerge(result, routeQueryResults.Select(o => o.MergeResult).ToList()); // MergeParallelExecuteResult(result, , async); diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AbstractCircuitBreaker.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AbstractCircuitBreaker.cs index d483194e..d409f3fb 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AbstractCircuitBreaker.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AbstractCircuitBreaker.cs @@ -55,9 +55,9 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers _afterTerminated?.Invoke(); } - public void Register(Action afterTrip) + public void Register(Action afterTerminated) { - _afterTerminated = afterTrip; + _afterTerminated = afterTerminated; } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AllCircuitBreaker.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AllCircuitBreaker.cs index 441dcc87..157f7f42 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AllCircuitBreaker.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AllCircuitBreaker.cs @@ -13,13 +13,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers protected override bool OrderConditionTerminated(IEnumerable results) { //只要有一个是false就拉闸 - return results.Any(o => o is RouteQueryResult routeQueryResult && routeQueryResult.QueryResult==false); + return results.Any(o => o is false); } protected override bool RandomConditionTerminated(IEnumerable results) { //只要有一个是false就拉闸 - return results.Any(o => o is RouteQueryResult routeQueryResult && routeQueryResult.QueryResult == false); + return results.Any(o => o is false); } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AnyCircuitBreaker.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AnyCircuitBreaker.cs index 0e03ce1c..aa55a71b 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AnyCircuitBreaker.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/AnyCircuitBreaker.cs @@ -11,12 +11,12 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers } protected override bool OrderConditionTerminated(IEnumerable results) { - return results.Any(o => o is RouteQueryResult routeQueryResult && routeQueryResult.QueryResult); + return results.Any(o => o is true); } protected override bool RandomConditionTerminated(IEnumerable results) { - return results.Any(o => o is RouteQueryResult routeQueryResult && routeQueryResult.QueryResult); + return results.Any(o => o is true); } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/ContainsCircuitBreaker.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/ContainsCircuitBreaker.cs index 8068973a..d4d68126 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/ContainsCircuitBreaker.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/ContainsCircuitBreaker.cs @@ -12,12 +12,12 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers protected override bool OrderConditionTerminated(IEnumerable results) { - return results.Any(o => o is RouteQueryResult routeQueryResult && routeQueryResult.QueryResult); + return results.Any(o => o is true); } protected override bool RandomConditionTerminated(IEnumerable results) { - return results.Any(o => o is RouteQueryResult routeQueryResult && routeQueryResult.QueryResult); + return results.Any(o => o is true); } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/SingleOrSingleOrDefaultCircuitBreaker.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/SingleOrSingleOrDefaultCircuitBreaker.cs deleted file mode 100644 index a4a935f3..00000000 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/CircuitBreakers/SingleOrSingleOrDefaultCircuitBreaker.cs +++ /dev/null @@ -1,27 +0,0 @@ -using System.Collections.Generic; -using System.Linq; -using ShardingCore.Sharding.StreamMergeEngines; - -namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers -{ - internal class SingleOrSingleOrDefaultCircuitBreaker : AbstractCircuitBreaker - { - public SingleOrSingleOrDefaultCircuitBreaker(StreamMergeContext streamMergeContext) : base(streamMergeContext) - { - } - - protected override bool OrderConditionTerminated(IEnumerable results) - { - return results - .Where(o => o is IRouteQueryResult routeQueryResult && routeQueryResult.HasQueryResult()) - .Take(2).Count() > 1; - } - - protected override bool RandomConditionTerminated(IEnumerable results) - { - return results - .Where(o => o is IRouteQueryResult routeQueryResult && routeQueryResult.HasQueryResult()) - .Take(2).Count() > 1; - } - } -} diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/AnyMethodExecutor.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/AnyMethodExecutor.cs index 81eefc12..32bc88e4 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/AnyMethodExecutor.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/AnyMethodExecutor.cs @@ -30,7 +30,12 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods public override ICircuitBreaker CreateCircuitBreaker() { - return new AnyElementCircuitBreaker(GetStreamMergeContext()); + var anyCircuitBreaker = new AnyCircuitBreaker(GetStreamMergeContext()); + anyCircuitBreaker.Register(() => + { + Cancel(); + }); + return anyCircuitBreaker; } public override IShardingMerger GetShardingMerger() diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/AverageMethodWrapExecutor.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/AverageMethodWrapExecutor.cs index 8821e2b0..4722fbda 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/AverageMethodWrapExecutor.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/AverageMethodWrapExecutor.cs @@ -33,7 +33,12 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods public override ICircuitBreaker CreateCircuitBreaker() { - return new NoTripCircuitBreaker(GetStreamMergeContext()); + var circuitBreaker = new NoTripCircuitBreaker(GetStreamMergeContext()); + circuitBreaker.Register(() => + { + Cancel(); + }); + return circuitBreaker; } public override IShardingMerger>> GetShardingMerger() diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/MaxMethodExecutor.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/MaxMethodExecutor.cs index d62fee1f..896b7bde 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/MaxMethodExecutor.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/MaxMethodExecutor.cs @@ -32,7 +32,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods public override ICircuitBreaker CreateCircuitBreaker() { - return new AnyElementCircuitBreaker(GetStreamMergeContext()); + + var circuitBreaker = new AnyElementCircuitBreaker(GetStreamMergeContext()); + circuitBreaker.Register(() => + { + Cancel(); + }); + return circuitBreaker; } public override IShardingMerger> GetShardingMerger() diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/MinMethodExecutor.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/MinMethodExecutor.cs index b5b1c538..7d72d585 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/MinMethodExecutor.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/MinMethodExecutor.cs @@ -31,7 +31,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods public override ICircuitBreaker CreateCircuitBreaker() { - return new AnyElementCircuitBreaker(GetStreamMergeContext()); + + var circuitBreaker = new AnyElementCircuitBreaker(GetStreamMergeContext()); + circuitBreaker.Register(() => + { + Cancel(); + }); + return circuitBreaker; } public override IShardingMerger> GetShardingMerger() diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/SumMethodWrapExecutor.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/SumMethodExecutor.cs similarity index 80% rename from src/ShardingCore/Sharding/MergeEngines/Executors/Methods/SumMethodWrapExecutor.cs rename to src/ShardingCore/Sharding/MergeEngines/Executors/Methods/SumMethodExecutor.cs index bfc6dd28..7c98b342 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/SumMethodWrapExecutor.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/Methods/SumMethodExecutor.cs @@ -24,18 +24,23 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods /// Author: xjm /// Created: 2022/5/7 11:13:57 /// Email: 326308290@qq.com - internal class SumMethodWrapExecutor : AbstractMethodExecutor + internal class SumMethodExecutor : AbstractMethodWrapExecutor { - public SumMethodWrapExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext) + public SumMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext) { } public override ICircuitBreaker CreateCircuitBreaker() { - return new NoTripCircuitBreaker(GetStreamMergeContext()); + var circuitBreaker = new NoTripCircuitBreaker(GetStreamMergeContext()); + circuitBreaker.Register(() => + { + Cancel(); + }); + return circuitBreaker; } - public override IShardingMerger GetShardingMerger() + public override IShardingMerger> GetShardingMerger() { return new SumMethodShardingMerger(); } diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AbstractEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AbstractEnumerableShardingMerger.cs index d588e397..762cb064 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AbstractEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AbstractEnumerableShardingMerger.cs @@ -7,7 +7,7 @@ using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions; namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { - internal abstract class AbstractEnumerableShardingMerger:IShardingMerger> + internal abstract class AbstractEnumerableShardingMerger : IShardingMerger> { private readonly StreamMergeContext _streamMergeContext; private readonly bool _async; @@ -16,7 +16,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { return _streamMergeContext; } - public AbstractEnumerableShardingMerger(StreamMergeContext streamMergeContext,bool async) + public AbstractEnumerableShardingMerger(StreamMergeContext streamMergeContext, bool async) { _streamMergeContext = streamMergeContext; _async = async; @@ -30,6 +30,11 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers return new MultiOrderStreamMergeAsyncEnumerator(_streamMergeContext, parallelResults); } + protected virtual IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) + { + return StreamMerge(parallelResults); + } + public virtual void InMemoryMerge(List> beforeInMemoryResults, List> parallelResults) { @@ -42,8 +47,8 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers var parallelCount = parallelResults.Count; if (parallelCount == 0) return; - - + + //聚合 if (parallelResults is IEnumerable> parallelStreamEnumeratorResults) { @@ -52,26 +57,26 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { mergeAsyncEnumerators.Add(beforeInMemoryResults.First()); } - + foreach (var parallelStreamEnumeratorResult in parallelStreamEnumeratorResults) { mergeAsyncEnumerators.Add(parallelStreamEnumeratorResult); } - - var combineStreamMergeAsyncEnumerator =StreamMerge(mergeAsyncEnumerators); + + var combineStreamMergeAsyncEnumerator = StreamInMemoryMerge(mergeAsyncEnumerators); // var streamMergeContext = GetStreamMergeContext(); // IStreamMergeAsyncEnumerator inMemoryStreamMergeAsyncEnumerator =streamMergeContext.HasGroupQuery()&&streamMergeContext.GroupQueryMemoryMerge()? // new InMemoryGroupByOrderStreamMergeAsyncEnumerator(streamMergeContext,combineStreamMergeAsyncEnumerator,async): // new InMemoryStreamMergeAsyncEnumerator(combineStreamMergeAsyncEnumerator, async); - var inMemoryStreamMergeAsyncEnumerator= new InMemoryStreamMergeAsyncEnumerator(combineStreamMergeAsyncEnumerator, _async); + var inMemoryStreamMergeAsyncEnumerator = new InMemoryStreamMergeAsyncEnumerator(combineStreamMergeAsyncEnumerator, _async); beforeInMemoryResults.Clear(); beforeInMemoryResults.Add(inMemoryStreamMergeAsyncEnumerator); //合并 return; } - + throw new ShardingCoreInvalidOperationException( $"{typeof(TEntity)} is not {typeof(IStreamMergeAsyncEnumerator)}"); } - } + } } diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AnyMethodShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AnyMethodShardingMerger.cs index 7fda017b..b939da48 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AnyMethodShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AnyMethodShardingMerger.cs @@ -11,7 +11,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers static AnyMethodShardingMerger() { - _allShardingMerger = new AllMethodShardingMerger(); + _allShardingMerger = new AnyMethodShardingMerger(); } public static IShardingMerger Instance => _allShardingMerger; diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ContainsMethodShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ContainsMethodShardingMerger.cs index adf6950e..f2e5d5df 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ContainsMethodShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ContainsMethodShardingMerger.cs @@ -11,7 +11,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers static ContainsMethodShardingMerger() { - _allShardingMerger = new AllMethodShardingMerger(); + _allShardingMerger = new ContainsMethodShardingMerger(); } public static IShardingMerger Instance => _allShardingMerger; diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/CountMethodShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/CountMethodShardingMerger.cs index 3a8a8625..31ce25ac 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/CountMethodShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/CountMethodShardingMerger.cs @@ -28,7 +28,9 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers return new RouteQueryResult(null,null,r,true); } - return new RouteQueryResult(null,null,parallelResults.Sum(o => o.QueryResult),true); + + var sum = parallelResults.Sum(o => o.QueryResult); + return new RouteQueryResult(null,null, sum, true); } public void InMemoryMerge(List> beforeInMemoryResults, List> parallelResults) diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/DefaultEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/DefaultEnumerableShardingMerger.cs index 14d9a7b8..79c1b2b8 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/DefaultEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/DefaultEnumerableShardingMerger.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using ShardingCore.Sharding.Enumerators; +using ShardingCore.Sharding.Enumerators.StreamMergeAsync; namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { @@ -8,5 +9,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers public DefaultEnumerableShardingMerger(StreamMergeContext streamMergeContext,bool async) : base(streamMergeContext,async) { } + + protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) + { + if (GetStreamMergeContext().IsPaginationQuery()) + return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake());//ڴۺϷҳֱӻȡskipȡskip+takeĿ + + return base.StreamInMemoryMerge(parallelResults); + } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs index 851a6825..537de038 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs @@ -11,13 +11,12 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { } - public override IStreamMergeAsyncEnumerator StreamMerge( - List> parallelResults) + protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) { if (GetStreamMergeContext().IsPaginationQuery()) - return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, - GetStreamMergeContext().GetPaginationReWriteTake()); //内存聚合分页不可以直接获取skip必须获取skip+take的数目 - return base.StreamMerge(parallelResults); + return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目 + + return base.StreamInMemoryMerge(parallelResults); } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/MaxMethodShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/MaxMethodShardingMerger.cs index 86626d10..e35cbc25 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/MaxMethodShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/MaxMethodShardingMerger.cs @@ -1,5 +1,7 @@ +using System; using System.Collections.Generic; using System.Linq; +using ShardingCore.Extensions; using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines; @@ -9,38 +11,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { public RouteQueryResult StreamMerge(List> parallelResults) { - var result = parallelResults.Where(o => o.HasQueryResult()).Max(o => o.QueryResult); - return new RouteQueryResult(null, null, result); - // var resultType = typeof(TEntity); - // if (!resultType.IsNullableType()) - // { - // var minTResult = GetMinTResult(parallelResults); - // return new RouteQueryResult(null, null, minTResult); - // } - // else - // { - // var result = parallelResults.Where(o => o.HasQueryResult()).Min(o => o.QueryResult); - // return new RouteQueryResult(null, null, result); - // } + var routeQueryResults = parallelResults.Where(o => o.HasQueryResult()).ToList(); + if (routeQueryResults.IsEmpty()) + throw new InvalidOperationException("Sequence contains no elements."); + var min = routeQueryResults.Max(o => o.QueryResult); + return new RouteQueryResult(null, null, min); } - // private TResult GetMinTResult(List> source) - // { - // var routeQueryResults = source.Where(o => o.HasQueryResult()).ToList(); - // if (routeQueryResults.IsEmpty()) - // throw new InvalidOperationException("Sequence contains no elements."); - // var min = routeQueryResults.Min(o => o.QueryResult); - // - // return ConvertNumber(min); - // } - // - // private TResult ConvertNumber(TNumber number) - // { - // if (number == null) - // return default; - // var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult)); - // return Expression.Lambda>(convertExpr).Compile()(); - // } public void InMemoryMerge(List> beforeInMemoryResults, List> parallelResults) { diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/MinMethodShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/MinMethodShardingMerger.cs index 8ebb7930..fd7bb051 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/MinMethodShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/MinMethodShardingMerger.cs @@ -13,81 +13,17 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { public RouteQueryResult StreamMerge(List> parallelResults) { - var result = parallelResults.Where(o => o.HasQueryResult()).Min(o => o.QueryResult); - return new RouteQueryResult(null, null, result); - // var resultType = typeof(TEntity); - // if (!resultType.IsNullableType()) - // { - // var minTResult = GetMinTResult(parallelResults); - // return new RouteQueryResult(null, null, minTResult); - // } - // else - // { - // var result = parallelResults.Where(o => o.HasQueryResult()).Min(o => o.QueryResult); - // return new RouteQueryResult(null, null, result); - // } - } - // private TResult GetMinTResult(List> source) - // { - // var routeQueryResults = source.Where(o => o.HasQueryResult()).ToList(); - // if (routeQueryResults.IsEmpty()) - // throw new InvalidOperationException("Sequence contains no elements."); - // var min = routeQueryResults.Min(o => o.QueryResult); - // - // return ConvertNumber(min); - // } - // - // private TResult ConvertNumber(TNumber number) - // { - // if (number == null) - // return default; - // var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult)); - // return Expression.Lambda>(convertExpr).Compile()(); - // } + var routeQueryResults = parallelResults.Where(o => o.HasQueryResult()).ToList(); + if (routeQueryResults.IsEmpty()) + throw new InvalidOperationException("Sequence contains no elements."); + var min = routeQueryResults.Min(o => o.QueryResult); + return new RouteQueryResult(null, null, min); + } public void InMemoryMerge(List> beforeInMemoryResults, List> parallelResults) { beforeInMemoryResults.AddRange(parallelResults); } } -} - - -// -// var resultType = typeof(TEntity); -// if (!resultType.IsNullableType()) -// { -// if (typeof(decimal) == resultType) -// { -// var result = await base.ExecuteAsync(cancellationToken); -// return GetMinTResult(result); -// } -// if (typeof(float) == resultType) -// { -// var result = await base.ExecuteAsync(cancellationToken); -// return GetMinTResult(result); -// } -// if (typeof(int) == resultType) -// { -// var result = await base.ExecuteAsync(cancellationToken); -// return GetMinTResult(result); -// } -// if (typeof(long) == resultType) -// { -// var result = await base.ExecuteAsync(cancellationToken); -// return GetMinTResult(result); -// } -// if (typeof(double) == resultType) -// { -// var result = await base.ExecuteAsync(cancellationToken); -// return GetMinTResult(result); -// } -// -// throw new ShardingCoreException($"cant calc min value, type:[{resultType}]"); -// } -// else -// { -// var result = await base.ExecuteAsync(cancellationToken); -// return result.Where(o => o.HasQueryResult()).Min(o => o.QueryResult); -// } \ No newline at end of file +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs index ee800448..9aa668e6 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs @@ -11,7 +11,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { } - public override IStreamMergeAsyncEnumerator StreamMerge( + protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge( List> parallelResults) { if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery()) @@ -27,7 +27,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers if (GetStreamMergeContext().IsPaginationQuery()) return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake()); - return base.StreamMerge(parallelResults); + return base.StreamInMemoryMerge(parallelResults); } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SumMethodShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SumMethodShardingMerger.cs index 800232c5..31045904 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SumMethodShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SumMethodShardingMerger.cs @@ -5,16 +5,17 @@ using System.Linq.Expressions; using ShardingCore.Extensions; using ShardingCore.Sharding.Enumerators.AggregateExtensions; using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions; +using ShardingCore.Sharding.StreamMergeEngines; namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { - internal class SumMethodShardingMerger : IShardingMerger + internal class SumMethodShardingMerger : IShardingMerger> { private TEntity GetSumResult(List source) { if (source.IsEmpty()) return default; - var sum = source.AsQueryable().SumByConstant(); + var sum = source.AsQueryable().SumByPropertyName(nameof(RouteQueryResult.QueryResult)); return ConvertSum(sum); } @@ -30,12 +31,13 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers // { // return GetSumResult(resultList); // } - public TEntity StreamMerge(List parallelResults) + public RouteQueryResult StreamMerge(List> parallelResults) { - return GetSumResult(parallelResults); + var sumResult = GetSumResult(parallelResults); + return new RouteQueryResult(null, null, sumResult, true); } - public void InMemoryMerge(List beforeInMemoryResults, List parallelResults) + public void InMemoryMerge(List> beforeInMemoryResults, List> parallelResults) { beforeInMemoryResults.AddRange(parallelResults); } diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureWrapMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureWrapMergeEngine.cs index 934ff500..2a2c25b5 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureWrapMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureWrapMergeEngine.cs @@ -32,14 +32,5 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.I var result =await ShardingExecutor.Instance.ExecuteAsync>(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken); return result.QueryResult; } - // - // protected abstract TResult DoMergeResult(List> resultList); - // - // protected override IExecutor CreateExecutor(bool async) - // { - // return CreateExecutor0(async) as IExecutor; - // } - // - // protected abstract IExecutor> CreateExecutor0(bool async); } } diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MaxAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MaxAsyncInMemoryMergeEngine.cs index f2c3d97d..70609216 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MaxAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MaxAsyncInMemoryMergeEngine.cs @@ -10,6 +10,8 @@ using System.Threading; using System.Threading.Tasks; using ShardingCore.Sharding.MergeEngines.Executors.Abstractions; using ShardingCore.Sharding.MergeEngines.Executors.Methods; +using ShardingCore.Sharding.MergeEngines.ShardingExecutors; +using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions; using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.InMemoryMerge; using ShardingCore.Sharding.StreamMergeEngines; @@ -22,43 +24,111 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines * @Ver: 1.0 * @Email: 326308290@qq.com */ - internal class MaxAsyncInMemoryMergeEngine : AbstractMethodEnsureWrapMergeEngine + internal class MaxAsyncInMemoryMergeEngine : AbstractBaseMergeEngine, IEnsureMergeResult { - public MaxAsyncInMemoryMergeEngine(StreamMergeContext streamStreamMergeContext) : base(streamStreamMergeContext) + + public MaxAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) { } - protected override IExecutor> CreateExecutor() + public TResult MergeResult() + { + return MergeResultAsync().WaitAndUnwrapException(false); + } + + + public async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + { + + var resultType = typeof(TEntity); + if (!resultType.IsNullableType()) + { + if (typeof(decimal) == resultType) + { + var result = await ExecuteAsync(cancellationToken); + return ConvertNumber(result); + } + if (typeof(float) == resultType) + { + var result = await ExecuteAsync(cancellationToken); + return ConvertNumber(result); + } + if (typeof(int) == resultType) + { + var result = await ExecuteAsync(cancellationToken); + return ConvertNumber(result); + } + if (typeof(long) == resultType) + { + var result = await ExecuteAsync(cancellationToken); + return ConvertNumber(result); + } + if (typeof(double) == resultType) + { + var result = await ExecuteAsync(cancellationToken); + return ConvertNumber(result); + } + + throw new ShardingCoreException($"cant calc min value, type:[{resultType}]"); + } + else + { + var result = await ExecuteAsync(cancellationToken); + return result; + } + } + + private TResult ConvertNumber(TNumber number) + { + if (number == null) + return default; + var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult)); + return Expression.Lambda>(convertExpr).Compile()(); + } + + private async Task ExecuteAsync(CancellationToken cancellationToken = new CancellationToken()) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!GetStreamMergeContext().TryPrepareExecuteContinueQuery(() => default(TR), out var tr)) + { + return tr; + } + var defaultSqlRouteUnits = GetDefaultSqlRouteUnits(); + var executor = CreateExecutor(); + var result = await ShardingExecutor.Instance.ExecuteAsync>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken); + return result.QueryResult; + } + protected IExecutor> CreateExecutor() { var resultType = typeof(TEntity); if (!resultType.IsNullableType()) { if (typeof(decimal) == resultType) { - return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; + return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; } if (typeof(float) == resultType) { - return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; + return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; } if (typeof(int) == resultType) { - return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; + return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; } if (typeof(long) == resultType) { - return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; + return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; } if (typeof(double) == resultType) { - return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; + return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; } throw new ShardingCoreException($"cant calc max value, type:[{resultType}]"); } else { - return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; + return new MaxMethodExecutor(GetStreamMergeContext()) as IExecutor>; } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MinAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MinAsyncInMemoryMergeEngine.cs index f1ac966b..7d06d178 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MinAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MinAsyncInMemoryMergeEngine.cs @@ -10,6 +10,8 @@ using System.Threading; using System.Threading.Tasks; using ShardingCore.Sharding.MergeEngines.Executors.Abstractions; using ShardingCore.Sharding.MergeEngines.Executors.Methods; +using ShardingCore.Sharding.MergeEngines.ShardingExecutors; +using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions; using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.InMemoryMerge; using ShardingCore.Sharding.StreamMergeEngines; @@ -22,43 +24,112 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines * @Ver: 1.0 * @Email: 326308290@qq.com */ - internal class MinAsyncInMemoryMergeEngine : AbstractMethodEnsureWrapMergeEngine + internal class MinAsyncInMemoryMergeEngine : AbstractBaseMergeEngine, IEnsureMergeResult { - public MinAsyncInMemoryMergeEngine(StreamMergeContext streamStreamMergeContext) : base(streamStreamMergeContext) + + public MinAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) { } - - protected override IExecutor> CreateExecutor() + + public TResult MergeResult() { + return MergeResultAsync().WaitAndUnwrapException(false); + } + + + public async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + { + var resultType = typeof(TEntity); if (!resultType.IsNullableType()) { if (typeof(decimal) == resultType) { - return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; + var result = await ExecuteAsync(cancellationToken); + return ConvertNumber(result); } if (typeof(float) == resultType) { - return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; + var result = await ExecuteAsync(cancellationToken); + return ConvertNumber(result); } if (typeof(int) == resultType) { - return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; + var result = await ExecuteAsync(cancellationToken); + return ConvertNumber(result); } if (typeof(long) == resultType) { - return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; + var result = await ExecuteAsync(cancellationToken); + return ConvertNumber(result); } if (typeof(double) == resultType) { - return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; + var result = await ExecuteAsync(cancellationToken); + return ConvertNumber(result); } throw new ShardingCoreException($"cant calc min value, type:[{resultType}]"); } else { - return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; + var result = await ExecuteAsync(cancellationToken); + return result; + } + } + + private TResult ConvertNumber(TNumber number) + { + if (number == null) + return default; + var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult)); + return Expression.Lambda>(convertExpr).Compile()(); + } + + private async Task ExecuteAsync(CancellationToken cancellationToken = new CancellationToken()) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!GetStreamMergeContext().TryPrepareExecuteContinueQuery(() => default(TR), out var tr)) + { + return tr; + } + var defaultSqlRouteUnits = GetDefaultSqlRouteUnits(); + var executor = CreateExecutor(); + var result = await ShardingExecutor.Instance.ExecuteAsync>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken); + return result.QueryResult; + } + + protected IExecutor> CreateExecutor() + { + var resultType = typeof(TEntity); + if (!resultType.IsNullableType()) + { + if (typeof(decimal) == resultType) + { + return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; + } + if (typeof(float) == resultType) + { + return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; + } + if (typeof(int) == resultType) + { + return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; + } + if (typeof(long) == resultType) + { + return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; + } + if (typeof(double) == resultType) + { + return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; + } + + throw new ShardingCoreException($"cant calc min value, type:[{resultType}]"); + } + else + { + return new MinMethodExecutor(GetStreamMergeContext()) as IExecutor>; } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SumAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SumAsyncInMemoryMergeEngine.cs index 10156b94..49d00e88 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SumAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SumAsyncInMemoryMergeEngine.cs @@ -1,6 +1,7 @@ using ShardingCore.Sharding.MergeEngines.Executors.Abstractions; using ShardingCore.Sharding.MergeEngines.Executors.Methods; using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.InMemoryMerge; +using ShardingCore.Sharding.StreamMergeEngines; namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines { @@ -11,34 +12,16 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines * @Ver: 1.0 * @Email: 326308290@qq.com */ - internal class SumAsyncInMemoryMergeEngine : AbstractMethodEnsureMergeEngine + internal class SumAsyncInMemoryMergeEngine : AbstractMethodEnsureWrapMergeEngine { public SumAsyncInMemoryMergeEngine(StreamMergeContext streamStreamMergeContext) : base(streamStreamMergeContext) { } - // private TResult GetSumResult(List> source) - // { - // if (source.IsEmpty()) - // return default; - // var sum = source.AsQueryable().SumByPropertyName(nameof(RouteQueryResult.QueryResult)); - // return ConvertSum(sum); - // } - // private TResult ConvertSum(TNumber number) - // { - // if (number == null) - // return default; - // var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult)); - // return Expression.Lambda>(convertExpr).Compile()(); - // } - // protected override TResult DoMergeResult(List> resultList) - // { - // return GetSumResult(resultList); - // } - protected override IExecutor CreateExecutor() + protected override IExecutor> CreateExecutor() { - return new SumMethodWrapExecutor(GetStreamMergeContext()); + return new SumMethodExecutor(GetStreamMergeContext()); } } }