From b1da6ee5b59f82a20858140ae6d083af1ffd809d Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Tue, 9 Aug 2022 13:36:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96group?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...yGroupByOrderStreamMergeAsyncEnumerator.cs | 4 +- .../Sharding/MergeContexts/GroupByContext.cs | 6 +++ .../MergeContexts/QueryableRewriteEngine.cs | 48 ++++++++++++------- .../AbstractEnumerableShardingMerger.cs | 23 +++++++++ .../DefaultEnumerableShardingMerger.cs | 10 +--- .../ReverseEnumerableShardingMerger.cs | 36 +++++++------- .../SequenceEnumerableShardingMerger.cs | 5 ++ .../Sharding/StreamMergeContext.cs | 2 +- 8 files changed, 87 insertions(+), 47 deletions(-) diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryGroupByOrderStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryGroupByOrderStreamMergeAsyncEnumerator.cs index e6bacbbc..493af5f2 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryGroupByOrderStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryGroupByOrderStreamMergeAsyncEnumerator.cs @@ -44,7 +44,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync list.Add(streamMergeAsyncEnumerator.GetCurrent()); _inMemoryReallyCount++; } - return list.AsQueryable().OrderWithExpression(_streamMergeContext.Orders).GetEnumerator(); + return list.AsQueryable().OrderWithExpression(_streamMergeContext.GroupByContext.PropertyOrders).GetEnumerator(); } private IEnumerator GetAllRows(IStreamMergeAsyncEnumerator streamMergeAsyncEnumerator) { @@ -60,7 +60,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync _inMemoryReallyCount++; } - return list.AsQueryable().OrderWithExpression(_streamMergeContext.Orders).GetEnumerator(); + return list.AsQueryable().OrderWithExpression(_streamMergeContext.GroupByContext.PropertyOrders).GetEnumerator(); } public bool SkipFirst() diff --git a/src/ShardingCore/Sharding/MergeContexts/GroupByContext.cs b/src/ShardingCore/Sharding/MergeContexts/GroupByContext.cs index 7c2f691a..8d3ba04b 100644 --- a/src/ShardingCore/Sharding/MergeContexts/GroupByContext.cs +++ b/src/ShardingCore/Sharding/MergeContexts/GroupByContext.cs @@ -1,3 +1,4 @@ +using System.Collections.Generic; using System.Linq.Expressions; namespace ShardingCore.Sharding.MergeContexts @@ -18,6 +19,11 @@ namespace ShardingCore.Sharding.MergeContexts /// 是否内存聚合 /// public bool GroupMemoryMerge { get; set; } + public List PropertyOrders { get; } = new List(); + public string GetOrderExpression() + { + return string.Join(",", PropertyOrders); + } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeContexts/QueryableRewriteEngine.cs b/src/ShardingCore/Sharding/MergeContexts/QueryableRewriteEngine.cs index 565d7c9d..393a18d5 100644 --- a/src/ShardingCore/Sharding/MergeContexts/QueryableRewriteEngine.cs +++ b/src/ShardingCore/Sharding/MergeContexts/QueryableRewriteEngine.cs @@ -104,27 +104,39 @@ namespace ShardingCore.Sharding.MergeContexts } else { - var groupKeys = selectGroupKeyProperties.Select(o=>o.PropertyName).ToHashSet(); + var groupKeys = selectGroupKeyProperties.Select(o => o.PropertyName).ToHashSet(); bool groupMemoryMerge = false; foreach (var propertyOrder in orders) { - if (groupKeys.IsEmpty()) + groupByContext.PropertyOrders.Add(propertyOrder); + if (!groupMemoryMerge && !groupKeys.IsEmpty()) { - break; + if (!groupKeys.Contains(propertyOrder.PropertyExpression)) + { + groupMemoryMerge = true; + } + + groupKeys.Remove(propertyOrder.PropertyExpression); } - if (!groupKeys.Contains(propertyOrder.PropertyExpression)) - { - groupMemoryMerge = true; - break; - } - groupKeys.Remove(propertyOrder.PropertyExpression); } + //判断是否优先group key排序如果不是就是要内存聚合 groupByContext.GroupMemoryMerge = groupMemoryMerge; - - - var sort = string.Join(",", selectGroupKeyProperties.Select(o => $"{o.PropertyName} asc")); - reWriteQueryable = reWriteQueryable.RemoveAnyOrderBy().OrderWithExpression(sort, null); + if (groupByContext.GroupMemoryMerge) + { + if (groupByContext.GroupMemoryMerge) + { + var sort = string.Join(",", selectGroupKeyProperties.Select(o => $"{o.PropertyName} asc")); + reWriteQueryable = reWriteQueryable.RemoveAnyOrderBy().OrderWithExpression(sort, null); + } + + orders.Clear(); + foreach (var orderProperty in selectGroupKeyProperties) + { + orders.AddLast(new PropertyOrder(orderProperty.PropertyName, true, + orderProperty.OwnerType)); + } + } } // else if (!mergeQueryCompilerContext.UseUnionAllMerge()) @@ -188,11 +200,13 @@ namespace ShardingCore.Sharding.MergeContexts //} } - if (mergeQueryCompilerContext.UseUnionAllMerge() & - !mergeQueryCompilerContext.GetShardingDbContext().SupportUnionAllMerge()) + if (mergeQueryCompilerContext.UseUnionAllMerge()) { - throw new ShardingCoreException( - $"if use {nameof(EntityFrameworkShardingQueryableExtension.UseUnionAllMerge)} plz rewrite {nameof(IQuerySqlGeneratorFactory)} with {nameof(IUnionAllMergeQuerySqlGeneratorFactory)} and {nameof(IQueryCompiler)} with {nameof(IUnionAllMergeQueryCompiler)}"); + if (!mergeQueryCompilerContext.GetShardingDbContext().SupportUnionAllMerge()) + { + throw new ShardingCoreException( + $"if use {nameof(EntityFrameworkShardingQueryableExtension.UseUnionAllMerge)} plz rewrite {nameof(IQuerySqlGeneratorFactory)} with {nameof(IUnionAllMergeQuerySqlGeneratorFactory)} and {nameof(IQueryCompiler)} with {nameof(IUnionAllMergeQueryCompiler)}"); + } } return new RewriteResult(combineQueryable, reWriteQueryable); diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AbstractEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AbstractEnumerableShardingMerger.cs index 762cb064..f97872fe 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AbstractEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/AbstractEnumerableShardingMerger.cs @@ -23,6 +23,20 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers } public virtual IStreamMergeAsyncEnumerator StreamMerge(List> parallelResults) { + //如果是group in memory merger需要在内存中聚合好所有的 并且最后通过内存聚合在发挥 + if (GetStreamMergeContext().GroupQueryMemoryMerge()) + { + var multiAggregateOrderStreamMergeAsyncEnumerator = new MultiAggregateOrderStreamMergeAsyncEnumerator(_streamMergeContext, parallelResults); + //内存按key聚合好之后需要进行重排序按order + var inMemoryGroupByOrderStreamMergeAsyncEnumerator = new InMemoryGroupByOrderStreamMergeAsyncEnumerator(_streamMergeContext,multiAggregateOrderStreamMergeAsyncEnumerator, _async); + if (_streamMergeContext.IsPaginationQuery()) + { + //分页的前提下还需要进行内存分页 + return new PaginationStreamMergeAsyncEnumerator(_streamMergeContext,new[]{inMemoryGroupByOrderStreamMergeAsyncEnumerator}); + } + + return inMemoryGroupByOrderStreamMergeAsyncEnumerator; + } if (_streamMergeContext.IsPaginationQuery()) return new PaginationStreamMergeAsyncEnumerator(_streamMergeContext, parallelResults); if (_streamMergeContext.HasGroupQuery()) @@ -32,6 +46,15 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers protected virtual IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) { + //如果是group in memory merger需要在内存中聚合好所有的 并且最后通过内存聚合在发挥 + if (GetStreamMergeContext().GroupQueryMemoryMerge()) + { + return new MultiAggregateOrderStreamMergeAsyncEnumerator(_streamMergeContext, parallelResults); + } + if (GetStreamMergeContext().IsPaginationQuery()) + { + return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目 + } return StreamMerge(parallelResults); } diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/DefaultEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/DefaultEnumerableShardingMerger.cs index 79c1b2b8..6885b4e3 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/DefaultEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/DefaultEnumerableShardingMerger.cs @@ -9,13 +9,5 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers public DefaultEnumerableShardingMerger(StreamMergeContext streamMergeContext,bool async) : base(streamMergeContext,async) { } - - protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) - { - if (GetStreamMergeContext().IsPaginationQuery()) - return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, GetStreamMergeContext().GetPaginationReWriteTake());//ڴۺϷҳֱӻȡskipȡskip+takeĿ - - return base.StreamInMemoryMerge(parallelResults); - } } -} +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs index 9aa668e6..30331b97 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/ReverseEnumerableShardingMerger.cs @@ -11,23 +11,23 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers { } - protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge( - List> parallelResults) - { - if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery()) - { - var multiAggregateOrderStreamMergeAsyncEnumerator = - new MultiAggregateOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), - parallelResults); - return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), - new[] { multiAggregateOrderStreamMergeAsyncEnumerator }, 0, - GetStreamMergeContext().GetPaginationReWriteTake()); - } - - if (GetStreamMergeContext().IsPaginationQuery()) - return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, - GetStreamMergeContext().GetPaginationReWriteTake()); - return base.StreamInMemoryMerge(parallelResults); - } + // protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge( + // List> parallelResults) + // { + // // if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery()) + // // { + // // var multiAggregateOrderStreamMergeAsyncEnumerator = + // // new MultiAggregateOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), + // // parallelResults); + // // return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), + // // new[] { multiAggregateOrderStreamMergeAsyncEnumerator }, 0, + // // GetStreamMergeContext().GetPaginationReWriteTake()); + // // } + // + // if (GetStreamMergeContext().IsPaginationQuery()) + // return new PaginationStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults, 0, + // GetStreamMergeContext().GetPaginationReWriteTake()); + // return base.StreamInMemoryMerge(parallelResults); + // } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs index 12f28a80..05cd31bd 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Executors/ShardingMergers/SequenceEnumerableShardingMerger.cs @@ -22,5 +22,10 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers return new MultiOrderStreamMergeAsyncEnumerator(GetStreamMergeContext(), parallelResults); } + + protected override IStreamMergeAsyncEnumerator StreamInMemoryMerge(List> parallelResults) + { + return StreamMerge(parallelResults); + } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/StreamMergeContext.cs b/src/ShardingCore/Sharding/StreamMergeContext.cs index 95360546..30a783df 100644 --- a/src/ShardingCore/Sharding/StreamMergeContext.cs +++ b/src/ShardingCore/Sharding/StreamMergeContext.cs @@ -198,7 +198,7 @@ namespace ShardingCore.Sharding /// public bool GroupQueryMemoryMerge() { - return this.GroupByContext.GroupMemoryMerge; + return HasGroupQuery()&&this.GroupByContext.GroupMemoryMerge; } public bool IsMergeQuery()