diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AppendOrderSequenceEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AppendOrderSequenceEnumerableShardingMerger.cs index 79a3bcf3..23813a29 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AppendOrderSequenceEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AppendOrderSequenceEnumerableShardingMerger.cs @@ -14,14 +14,26 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers streamMergeContext, async) { } - public override IStreamMergeAsyncEnumerator StreamMerge( List> parallelResults) { if (GetStreamMergeContext().HasGroupQuery()) + { return new MultiAggregateOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults); + } + return new MultiOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults); } + + protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) + { + //如果是group in memory merger需要在内存中聚合好所有的 并且最后通过内存聚合在发挥 + if (GetStreamMergeContext().GroupQueryMemoryMerge()) + { + return new MultiAggregateOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults); + } + return StreamMerge(parallelResults); + } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs index 537de038..6f0837ce 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/LastOrDefaultEnumerableShardingMerger.cs @@ -10,13 +10,5 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers streamMergeContext, async) { } - - protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) - { - if (GetStreamMergeContext().IsPaginationQuery()) - return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目 - - return base.StreamInMemoryMerge(parallelResults); - } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs index 30331b97..6386b10f 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs @@ -11,23 +11,10 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { } - // protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge( - // List> parallelResults) - // { - // // if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery()) - // // { - // // var multiAggregateOrderStreamMergeAsyncEnumerator = - // // new MultiAggregateOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), - // // parallelResults); - // // return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), - // // new[] { multiAggregateOrderStreamMergeAsyncEnumerator }, 0, - // // GetStreamMergeContext().GetPaginationReWriteTake()); - // // } - // - // if (GetStreamMergeContext().IsPaginationQuery()) - // return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, - // GetStreamMergeContext().GetPaginationReWriteTake()); - // return base.StreamInMemoryMerge(parallelResults); - // } + public override IStreamMergeAsyncEnumerator StreamMerge(List> parallelResults) + { + var streamMergeAsyncEnumerator = base.StreamMerge(parallelResults); + return new InMemoryReverseStreamMergeAsyncEnumerator(streamMergeAsyncEnumerator); + } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs index 05cd31bd..86384581 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs @@ -25,6 +25,11 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) { + //如果是group in memory merger需要在内存中聚合好所有的 并且最后通过内存聚合在发挥 + if (GetStreamMergeContext().GroupQueryMemoryMerge()) + { + return new MultiAggregateOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults); + } return StreamMerge(parallelResults); } }