From b0f110829d18892ded10aa723294d2976b30aca6 Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Tue, 9 Aug 2022 13:41:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96group=E5=92=8Creverse?= =?UTF-8?q?=E7=AD=89=E6=9F=A5=E8=AF=A2=E8=81=9A=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ndOrderSequenceEnumerableShardingMerger.cs | 14 ++++++++++- .../LastOrDefaultEnumerableShardingMerger.cs | 8 ------- .../ReverseEnumerableShardingMerger.cs | 23 ++++--------------- .../SequenceEnumerableShardingMerger.cs | 5 ++++ 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AppendOrderSequenceEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AppendOrderSequenceEnumerableShardingMerger.cs index 79a3bcf3..23813a29 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AppendOrderSequenceEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AppendOrderSequenceEnumerableShardingMerger.cs @@ -14,14 +14,26 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers streamMergeContext, async) { } - public override IStreamMergeAsyncEnumerator StreamMerge( List> parallelResults) { if (GetStreamMergeContext().HasGroupQuery()) + { return new MultiAggregateOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults); + } + return new MultiOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults); } + + protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) + { + //如果是group in memory merger需要在内存中聚合好所有的 并且最后通过内存聚合在发挥 + if (GetStreamMergeContext().GroupQueryMemoryMerge()) + { + return new MultiAggregateOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults); + } + return StreamMerge(parallelResults); + } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs index 537de038..6f0837ce 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs @@ -10,13 +10,5 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers streamMergeContext, async) { } - - protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) - { - if (GetStreamMergeContext().IsPaginationQuery()) - return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目 - - return base.StreamInMemoryMerge(parallelResults); - } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs index 30331b97..6386b10f 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs @@ -11,23 +11,10 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { } - // protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge( - // List> parallelResults) - // { - // // if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery()) - // // { - // // var multiAggregateOrderStreamMergeAsyncEnumerator = - // // new MultiAggregateOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), - // // parallelResults); - // // return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), - // // new[] { multiAggregateOrderStreamMergeAsyncEnumerator }, 0, - // // GetStreamMergeContext().GetPaginationReWriteTake()); - // // } - // - // if (GetStreamMergeContext().IsPaginationQuery()) - // return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, - // GetStreamMergeContext().GetPaginationReWriteTake()); - // return base.StreamInMemoryMerge(parallelResults); - // } + public override IStreamMergeAsyncEnumerator StreamMerge(List> parallelResults) + { + var streamMergeAsyncEnumerator = base.StreamMerge(parallelResults); + return new InMemoryReverseStreamMergeAsyncEnumerator(streamMergeAsyncEnumerator); + } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs index 05cd31bd..86384581 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs @@ -25,6 +25,11 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) { + //如果是group in memory merger需要在内存中聚合好所有的 并且最后通过内存聚合在发挥 + if (GetStreamMergeContext().GroupQueryMemoryMerge()) + { + return new MultiAggregateOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults); + } return StreamMerge(parallelResults); } }