From 2f0a6b3af9ef2f6b86a6cafe419fb3f27390c24c Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Thu, 2 Sep 2021 17:33:51 +0800 Subject: [PATCH] sharding page process part --- .../SysUserModPaginationConfiguration.cs | 18 +++ .../Shardings/SysUserModVirtualTableRoute.cs | 6 + .../Abstractions/IShardingPageAccessor.cs | 18 +++ .../Abstractions/IShardingPageManager.cs | 23 +++ .../Core/ShardingPage/ShardingPageAccessor.cs | 28 ++++ .../Core/ShardingPage/ShardingPageContext.cs | 27 ++++ .../Core/ShardingPage/ShardingPageManager.cs | 32 ++++ .../Core/ShardingPage/ShardingPageScope.cs | 40 +++++ .../Abstractions/AbstractVirtualTableRoute.cs | 6 + .../TableRoutes/IVirtualTableRoute.cs | 6 + .../RoutingRuleEngine/RouteResult.cs | 19 ++- .../Core/VirtualTables/IVirtualTable.cs | 9 ++ .../Core/VirtualTables/OneDbVirtualTable.cs | 11 ++ src/ShardingCore/DIExtension.cs | 10 +- src/ShardingCore/Extensions/LinqExtension.cs | 25 ++- .../PaginationBuilder.cs | 28 +++- .../PaginationConfig.cs | 57 +++++++ .../PaginationMatchEnum.cs | 27 +++- .../PaginationMetadata.cs | 30 ++++ .../PaginationOrderPropertyBuilder.cs | 39 +++-- .../Sharding/StreamMergeContext.cs | 5 + .../AverageAsyncInMemoryMergeEngine.cs | 149 +++--------------- .../MaxAsyncInMemoryMergeEngine.cs | 6 +- .../MinAsyncInMemoryMergeEngine.cs | 7 +- .../SumAsyncInMemoryMergeEngine.cs | 119 ++------------ .../AllAsyncInMemoryMergeEngine.cs | 2 +- .../AnyAsyncInMemoryMergeEngine.cs | 7 +- .../AsyncEnumerableStreamMergeEngine.cs | 4 +- .../ContainsAsyncInMemoryMergeEngine.cs | 7 +- .../CountAsyncInMemoryMergeEngine.cs | 10 ++ ...bstractEnumeratorAsyncStreamMergeEngine.cs | 60 +++++++ .../AbstractEnumeratorStreamMergeEngine.cs | 52 ++++++ ...AbstractEnumeratorSyncStreamMergeEngine.cs | 31 ++++ .../IEnumeratorStreamMergeEngine.cs | 16 ++ .../NormalEnumeratorAsyncStreamMergeEngine.cs | 74 +++++++++ ...equenceEnumeratorAsyncStreamMergeEngine.cs | 100 ++++++++++++ .../FirstAsyncInMemoryMergeEngine.cs | 12 +- .../FirstOrDefaultAsyncInMemoryMergeEngine.cs | 12 +- .../LastAsyncInMemoryMergeEngine.cs | 12 +- .../LastOrDefaultAsyncInMemoryMergeEngine.cs | 12 +- .../LongCountAsyncInMemoryMergeEngine.cs | 11 ++ .../SingleAsyncInMemoryMergeEngine.cs | 12 +- ...SingleOrDefaultAsyncInMemoryMergeEngine.cs | 11 +- 43 files changed, 855 insertions(+), 335 deletions(-) create mode 100644 samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs create mode 100644 src/ShardingCore/Core/ShardingPage/Abstractions/IShardingPageAccessor.cs create mode 100644 src/ShardingCore/Core/ShardingPage/Abstractions/IShardingPageManager.cs create mode 100644 src/ShardingCore/Core/ShardingPage/ShardingPageAccessor.cs create mode 100644 src/ShardingCore/Core/ShardingPage/ShardingPageContext.cs create mode 100644 src/ShardingCore/Core/ShardingPage/ShardingPageManager.cs create mode 100644 src/ShardingCore/Core/ShardingPage/ShardingPageScope.cs create mode 100644 src/ShardingCore/Sharding/PaginationConfigurations/PaginationConfig.cs create mode 100644 src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs create mode 100644 src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs create mode 100644 src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs create mode 100644 src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorSyncStreamMergeEngine.cs create mode 100644 src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/IEnumeratorStreamMergeEngine.cs create mode 100644 src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/NormalEnumeratorAsyncStreamMergeEngine.cs create mode 100644 src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SequenceEnumeratorAsyncStreamMergeEngine.cs diff --git a/samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs b/samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs new file mode 100644 index 00000000..74ae3355 --- /dev/null +++ b/samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Sample.SqlServer.Domain.Entities; +using ShardingCore.Sharding.PaginationConfigurations; + +namespace Sample.SqlServer.Shardings +{ + public class SysUserModPaginationConfiguration : IPaginationConfiguration + { + public void Configure(PaginationBuilder builder) + { + builder.PaginationSequence(o => o.Id) + .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.FirstMatch); + } + } +} diff --git a/samples/Sample.SqlServer/Shardings/SysUserModVirtualTableRoute.cs b/samples/Sample.SqlServer/Shardings/SysUserModVirtualTableRoute.cs index d7667d37..24f85717 100644 --- a/samples/Sample.SqlServer/Shardings/SysUserModVirtualTableRoute.cs +++ b/samples/Sample.SqlServer/Shardings/SysUserModVirtualTableRoute.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq.Expressions; using Sample.SqlServer.Domain.Entities; using ShardingCore.Core.VirtualRoutes; +using ShardingCore.Sharding.PaginationConfigurations; using ShardingCore.VirtualRoutes; using ShardingCore.VirtualRoutes.Mods; @@ -25,5 +26,10 @@ namespace Sample.SqlServer.Shardings public SysUserModVirtualTableRoute() : base(2,3) { } + + public override IPaginationConfiguration CreatePaginationConfiguration() + { + return new SysUserModPaginationConfiguration(); + } } } \ No newline at end of file diff --git a/src/ShardingCore/Core/ShardingPage/Abstractions/IShardingPageAccessor.cs b/src/ShardingCore/Core/ShardingPage/Abstractions/IShardingPageAccessor.cs new file mode 100644 index 00000000..d4c86970 --- /dev/null +++ b/src/ShardingCore/Core/ShardingPage/Abstractions/IShardingPageAccessor.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace ShardingCore.Core.ShardingPage.Abstractions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 13:46:13 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public interface IShardingPageAccessor + { + ShardingPageContext ShardingPageContext { get; set; } + } +} diff --git a/src/ShardingCore/Core/ShardingPage/Abstractions/IShardingPageManager.cs b/src/ShardingCore/Core/ShardingPage/Abstractions/IShardingPageManager.cs new file mode 100644 index 00000000..1b511ec3 --- /dev/null +++ b/src/ShardingCore/Core/ShardingPage/Abstractions/IShardingPageManager.cs @@ -0,0 +1,23 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace ShardingCore.Core.ShardingPage.Abstractions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 13:46:37 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public interface IShardingPageManager + { + ShardingPageContext Current { get; } + /// + /// 创建分页scope + /// + /// + ShardingPageScope CreateScope(); + } +} diff --git a/src/ShardingCore/Core/ShardingPage/ShardingPageAccessor.cs b/src/ShardingCore/Core/ShardingPage/ShardingPageAccessor.cs new file mode 100644 index 00000000..dcc7ffeb --- /dev/null +++ b/src/ShardingCore/Core/ShardingPage/ShardingPageAccessor.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using ShardingCore.Core.ShardingPage.Abstractions; + +namespace ShardingCore.Core.ShardingPage +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 13:47:35 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class ShardingPageAccessor:IShardingPageAccessor + { + private static AsyncLocal _shardingPageContext = new AsyncLocal(); + + + /// + public ShardingPageContext ShardingPageContext + { + get => _shardingPageContext.Value; + set => _shardingPageContext.Value = value; + } + } +} diff --git a/src/ShardingCore/Core/ShardingPage/ShardingPageContext.cs b/src/ShardingCore/Core/ShardingPage/ShardingPageContext.cs new file mode 100644 index 00000000..7dd73208 --- /dev/null +++ b/src/ShardingCore/Core/ShardingPage/ShardingPageContext.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.Text; +using ShardingCore.Sharding.StreamMergeEngines; + +namespace ShardingCore.Core.ShardingPage +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 13:46:55 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class ShardingPageContext + { + public ICollection> RouteQueryResults { get; } + private ShardingPageContext() + { + RouteQueryResults = new LinkedList>(); + } + public static ShardingPageContext Create() + { + return new ShardingPageContext(); + } + } +} diff --git a/src/ShardingCore/Core/ShardingPage/ShardingPageManager.cs b/src/ShardingCore/Core/ShardingPage/ShardingPageManager.cs new file mode 100644 index 00000000..602e5496 --- /dev/null +++ b/src/ShardingCore/Core/ShardingPage/ShardingPageManager.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Text; +using ShardingCore.Core.ShardingPage.Abstractions; + +namespace ShardingCore.Core.ShardingPage +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 13:50:13 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class ShardingPageManager: IShardingPageManager + { + private readonly IShardingPageAccessor _shardingPageAccessor; + + public ShardingPageManager(IShardingPageAccessor shardingPageAccessor) + { + _shardingPageAccessor = shardingPageAccessor; + } + + public ShardingPageContext Current => _shardingPageAccessor.ShardingPageContext; + public ShardingPageScope CreateScope() + { + var shardingPageScope = new ShardingPageScope(_shardingPageAccessor); + _shardingPageAccessor.ShardingPageContext = ShardingPageContext.Create(); + return shardingPageScope; + } + } +} diff --git a/src/ShardingCore/Core/ShardingPage/ShardingPageScope.cs b/src/ShardingCore/Core/ShardingPage/ShardingPageScope.cs new file mode 100644 index 00000000..a743fb26 --- /dev/null +++ b/src/ShardingCore/Core/ShardingPage/ShardingPageScope.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.Text; +using ShardingCore.Core.ShardingPage.Abstractions; + +namespace ShardingCore.Core.ShardingPage +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 13:49:12 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class ShardingPageScope : IDisposable + { + + /// + /// 分表配置访问器 + /// + public IShardingPageAccessor ShardingPageAccessor { get; } + + /// + /// 构造函数 + /// + /// + public ShardingPageScope(IShardingPageAccessor shardingPageAccessor) + { + ShardingPageAccessor = shardingPageAccessor; + } + + /// + /// 回收 + /// + public void Dispose() + { + ShardingPageAccessor.ShardingPageContext = null; + } + } +} \ No newline at end of file diff --git a/src/ShardingCore/Core/VirtualRoutes/TableRoutes/Abstractions/AbstractVirtualTableRoute.cs b/src/ShardingCore/Core/VirtualRoutes/TableRoutes/Abstractions/AbstractVirtualTableRoute.cs index ace2b114..b0b3de47 100644 --- a/src/ShardingCore/Core/VirtualRoutes/TableRoutes/Abstractions/AbstractVirtualTableRoute.cs +++ b/src/ShardingCore/Core/VirtualRoutes/TableRoutes/Abstractions/AbstractVirtualTableRoute.cs @@ -6,6 +6,7 @@ using ShardingCore.Core.QueryRouteManagers; using ShardingCore.Core.QueryRouteManagers.Abstractions; using ShardingCore.Exceptions; using ShardingCore.Extensions; +using ShardingCore.Sharding.PaginationConfigurations; namespace ShardingCore.Core.VirtualRoutes.TableRoutes.Abstractions { @@ -17,6 +18,11 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.Abstractions */ public abstract class AbstractVirtualTableRoute : IVirtualTableRoute where T : class, IShardingTable { + public virtual IPaginationConfiguration CreatePaginationConfiguration() + { + return null; + } + public Type ShardingEntityType => typeof(T); /// /// 如何将分表字段转成对应的类型 diff --git a/src/ShardingCore/Core/VirtualRoutes/TableRoutes/IVirtualTableRoute.cs b/src/ShardingCore/Core/VirtualRoutes/TableRoutes/IVirtualTableRoute.cs index cff7f431..372e491d 100644 --- a/src/ShardingCore/Core/VirtualRoutes/TableRoutes/IVirtualTableRoute.cs +++ b/src/ShardingCore/Core/VirtualRoutes/TableRoutes/IVirtualTableRoute.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Linq; using ShardingCore.Core.PhysicTables; +using ShardingCore.Sharding.PaginationConfigurations; namespace ShardingCore.Core.VirtualRoutes.TableRoutes { @@ -44,5 +45,10 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes public interface IVirtualTableRoute : IVirtualTableRoute where T : class, IShardingTable { + /// + /// 返回null就是表示不开启分页配置 + /// + /// + IPaginationConfiguration CreatePaginationConfiguration(); } } \ No newline at end of file diff --git a/src/ShardingCore/Core/VirtualRoutes/TableRoutes/RoutingRuleEngine/RouteResult.cs b/src/ShardingCore/Core/VirtualRoutes/TableRoutes/RoutingRuleEngine/RouteResult.cs index c1f56282..72a05968 100644 --- a/src/ShardingCore/Core/VirtualRoutes/TableRoutes/RoutingRuleEngine/RouteResult.cs +++ b/src/ShardingCore/Core/VirtualRoutes/TableRoutes/RoutingRuleEngine/RouteResult.cs @@ -22,6 +22,23 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine } public ISet ReplaceTables { get; } - + + protected bool Equals(RouteResult other) + { + return Equals(ReplaceTables, other.ReplaceTables); + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + if (obj.GetType() != this.GetType()) return false; + return Equals((RouteResult) obj); + } + + public override int GetHashCode() + { + return (ReplaceTables != null ? ReplaceTables.GetHashCode() : 0); + } } } \ No newline at end of file diff --git a/src/ShardingCore/Core/VirtualTables/IVirtualTable.cs b/src/ShardingCore/Core/VirtualTables/IVirtualTable.cs index e1bc6448..2cb27e91 100644 --- a/src/ShardingCore/Core/VirtualTables/IVirtualTable.cs +++ b/src/ShardingCore/Core/VirtualTables/IVirtualTable.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using ShardingCore.Core.PhysicTables; using ShardingCore.Core.VirtualRoutes; using ShardingCore.Core.VirtualRoutes.TableRoutes; +using ShardingCore.Sharding.PaginationConfigurations; namespace ShardingCore.Core.VirtualTables { @@ -22,6 +23,14 @@ namespace ShardingCore.Core.VirtualTables /// 分表配置 /// ShardingTableConfig ShardingConfig { get; } + /// + /// 分页配置 + /// + PaginationMetadata PaginationMetadata { get; } + /// + /// 是否启用分页配置 + /// + bool EnablePagination => PaginationMetadata != null; /// /// 获取所有的物理表 diff --git a/src/ShardingCore/Core/VirtualTables/OneDbVirtualTable.cs b/src/ShardingCore/Core/VirtualTables/OneDbVirtualTable.cs index 556a33c5..eff6e5fd 100644 --- a/src/ShardingCore/Core/VirtualTables/OneDbVirtualTable.cs +++ b/src/ShardingCore/Core/VirtualTables/OneDbVirtualTable.cs @@ -8,6 +8,7 @@ using ShardingCore.Core.VirtualRoutes; using ShardingCore.Core.VirtualRoutes.TableRoutes; using ShardingCore.Exceptions; using ShardingCore.Extensions; +using ShardingCore.Sharding.PaginationConfigurations; using ShardingCore.Utils; namespace ShardingCore.Core.VirtualTables @@ -28,12 +29,22 @@ namespace ShardingCore.Core.VirtualTables public Type EntityType => typeof(T); public ShardingTableConfig ShardingConfig { get; } + + public PaginationMetadata PaginationMetadata { get; } + private readonly List _physicTables = new List(); public OneDbVirtualTable(IVirtualTableRoute virtualTableRoute) { _virtualTableRoute = virtualTableRoute; ShardingConfig = ShardingKeyUtil.Parse(EntityType); + var paginationConfiguration = virtualTableRoute.CreatePaginationConfiguration(); + if (paginationConfiguration != null) + { + PaginationMetadata = new PaginationMetadata(); + var paginationBuilder = new PaginationBuilder(PaginationMetadata); + paginationConfiguration.Configure(paginationBuilder); + } } public List GetAllPhysicTables() diff --git a/src/ShardingCore/DIExtension.cs b/src/ShardingCore/DIExtension.cs index 9e1d262e..2fb7dbb0 100644 --- a/src/ShardingCore/DIExtension.cs +++ b/src/ShardingCore/DIExtension.cs @@ -16,6 +16,8 @@ using ShardingCore.Core.QueryRouteManagers; using ShardingCore.Core.QueryRouteManagers.Abstractions; using ShardingCore.Core.ShardingAccessors; using ShardingCore.Core.ShardingAccessors.Abstractions; +using ShardingCore.Core.ShardingPage; +using ShardingCore.Core.ShardingPage.Abstractions; using ShardingCore.Core.VirtualRoutes; using ShardingCore.Core.VirtualRoutes.RouteTails.Abstractions; using ShardingCore.Sharding.ShardingQueryExecutors; @@ -87,9 +89,15 @@ namespace ShardingCore services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); + + //route manage services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(); + + //sharding page + services.AddSingleton(); + services.AddSingleton(); return services; } diff --git a/src/ShardingCore/Extensions/LinqExtension.cs b/src/ShardingCore/Extensions/LinqExtension.cs index d8301075..d0a5adf1 100644 --- a/src/ShardingCore/Extensions/LinqExtension.cs +++ b/src/ShardingCore/Extensions/LinqExtension.cs @@ -61,6 +61,29 @@ namespace ShardingCore.Extensions func(item); } } - + + /// + /// 是否有差异 + /// + /// + /// + /// + public static bool HasDifference(this IEnumerable source) + { + return source.Distinct().Count() > 1; + } + /// + /// 是否有差异 + /// + /// + /// + /// + /// + /// + public static bool HasDifference(this IEnumerable source, Func keySelector) + { + return source.Select(keySelector).Distinct().Count() > 1; + } + } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationBuilder.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationBuilder.cs index 5cdea843..6859c391 100644 --- a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationBuilder.cs +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationBuilder.cs @@ -16,6 +16,12 @@ namespace ShardingCore.Sharding.PaginationConfigurations */ public class PaginationBuilder where TEntity:class,IShardingTable { + private readonly PaginationMetadata _metadata; + + public PaginationBuilder(PaginationMetadata metadata) + { + _metadata = metadata; + } /// /// 分页顺序 /// @@ -23,7 +29,27 @@ namespace ShardingCore.Sharding.PaginationConfigurations /// public PaginationOrderPropertyBuilder PaginationSequence(Expression> orderPropertyExpression) { - return new PaginationOrderPropertyBuilder(orderPropertyExpression); + return new PaginationOrderPropertyBuilder(orderPropertyExpression, _metadata); + } + /// + /// 配置当跳过多少条后开始启用只能分页 + /// + /// + /// + public PaginationBuilder ConfigUseShardingPageIfGeSkip(long skip) + { + _metadata.UseShardingPageIfGeSkipAvg = skip; + return this; + } + /// + /// 配置当分表数目小于多少后直接取到内存不在流式处理 + /// + /// + /// + public PaginationBuilder ConfigTakeInMemoryCountIfLe(int count) + { + _metadata.TakeInMemoryCountIfLe = count; + return this; } } } diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationConfig.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationConfig.cs new file mode 100644 index 00000000..25201313 --- /dev/null +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationConfig.cs @@ -0,0 +1,57 @@ +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Reflection; +using System.Text; +using Microsoft.EntityFrameworkCore.Infrastructure; + +namespace ShardingCore.Sharding.PaginationConfigurations +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 7:45:55 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class PaginationConfig + { + public PaginationConfig(LambdaExpression orderPropertyExpression, PaginationMatchEnum paginationMatchEnum= PaginationMatchEnum.Owner, IComparer tailComparer=null) + { + OrderPropertyExpression = orderPropertyExpression; + OrderPropertyInfo = orderPropertyExpression.GetPropertyAccess(); + PropertyName = OrderPropertyInfo.Name; + PaginationMatchEnum = paginationMatchEnum; + TailComparer = tailComparer ?? Comparer.Default; + } + + public LambdaExpression OrderPropertyExpression { get; set; } + public IComparer TailComparer { get; set; } + public PaginationMatchEnum PaginationMatchEnum { get; set; } + public PropertyInfo OrderPropertyInfo { get; set; } + /// + /// 如果查询没发现排序就将当前配置追加上去 + /// + public bool AppendIfOrderNone { get; set; } + public string PropertyName { get;} + + + protected bool Equals(PaginationConfig other) + { + return PropertyName == other.PropertyName; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + if (obj.GetType() != this.GetType()) return false; + return Equals((PaginationConfig) obj); + } + + public override int GetHashCode() + { + return (PropertyName != null ? PropertyName.GetHashCode() : 0); + } + } +} diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMatchEnum.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMatchEnum.cs index ae67748f..fbea942e 100644 --- a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMatchEnum.cs +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMatchEnum.cs @@ -2,15 +2,26 @@ using System; namespace ShardingCore.Sharding.PaginationConfigurations { -/* -* @Author: xjm -* @Description: -* @Date: Wednesday, 01 September 2021 21:27:25 -* @Email: 326308290@qq.com -*/ + /* + * @Author: xjm + * @Description: + * @Date: Wednesday, 01 September 2021 21:27:25 + * @Email: 326308290@qq.com + */ + [Flags] public enum PaginationMatchEnum { - Owner=1, - Named=1<<1 + /// + /// ǵǰ + /// + Owner = 1, + /// + /// ֻҪһͿ + /// + Named = 1 << 1, + /// + /// һƥͿ + /// + FirstMatch = 1 << 2 } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs new file mode 100644 index 00000000..f8968190 --- /dev/null +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs @@ -0,0 +1,30 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace ShardingCore.Sharding.PaginationConfigurations +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 7:45:16 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + /// + /// 分页配置元数据 + /// + public class PaginationMetadata + { + public ISet PaginationConfigs = new HashSet(); + /// + /// 配置生效当跳过多少条后 GREATER THAN OR EQUAL + /// + public long UseShardingPageIfGeSkipAvg { get; set; } = 3000L; + /// + /// 分表发现如果少于多少条后直接取到内存 LESS THAN OR EQUAL + /// + public int TakeInMemoryCountIfLe { get; set; } = 100; + + } +} diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationOrderPropertyBuilder.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationOrderPropertyBuilder.cs index 050ae25b..362c89f2 100644 --- a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationOrderPropertyBuilder.cs +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationOrderPropertyBuilder.cs @@ -1,28 +1,23 @@ using System; using System.Collections.Generic; using System.Linq.Expressions; -using System.Reflection; -using Microsoft.EntityFrameworkCore.Infrastructure; namespace ShardingCore.Sharding.PaginationConfigurations { -/* -* @Author: xjm -* @Description: -* @Date: Wednesday, 01 September 2021 21:32:53 -* @Email: 326308290@qq.com -*/ + /* + * @Author: xjm + * @Description: + * @Date: Wednesday, 01 September 2021 21:32:53 + * @Email: 326308290@qq.com + */ public class PaginationOrderPropertyBuilder { - private readonly LambdaExpression _orderPropertyExpression; - private IComparer _tailComparer; - private PaginationMatchEnum _paginationMatchEnum; - private PropertyInfo _orderPropertyInfo; + private readonly PaginationConfig _paginationConfig; - public PaginationOrderPropertyBuilder(LambdaExpression orderPropertyExpression) + public PaginationOrderPropertyBuilder(LambdaExpression orderPropertyExpression,PaginationMetadata metadata) { - _orderPropertyExpression = orderPropertyExpression; - _orderPropertyInfo = orderPropertyExpression.GetPropertyAccess(); + _paginationConfig = new PaginationConfig(orderPropertyExpression); + metadata.PaginationConfigs.Add(_paginationConfig); } /// @@ -32,7 +27,8 @@ namespace ShardingCore.Sharding.PaginationConfigurations /// public PaginationOrderPropertyBuilder UseTailCompare(IComparer tailComparer) { - _tailComparer = tailComparer ?? throw new ArgumentException(nameof(tailComparer)); + + _paginationConfig.TailComparer= tailComparer ?? throw new ArgumentException(nameof(tailComparer)); return this; } /// @@ -42,7 +38,16 @@ namespace ShardingCore.Sharding.PaginationConfigurations /// public PaginationOrderPropertyBuilder UseQueryMatch(PaginationMatchEnum paginationMatchEnum) { - _paginationMatchEnum = paginationMatchEnum; + _paginationConfig.PaginationMatchEnum = paginationMatchEnum; + return this; + } + /// + /// 如果查询没发现排序就将当前配置追加上去 + /// + /// + public PaginationOrderPropertyBuilder UseAppendIfOrderNone() + { + _paginationConfig.AppendIfOrderNone = true; return this; } } diff --git a/src/ShardingCore/Sharding/StreamMergeContext.cs b/src/ShardingCore/Sharding/StreamMergeContext.cs index 77d6a6eb..c310e745 100644 --- a/src/ShardingCore/Sharding/StreamMergeContext.cs +++ b/src/ShardingCore/Sharding/StreamMergeContext.cs @@ -104,5 +104,10 @@ namespace ShardingCore.Sharding return this.SelectContext.SelectProperties.Any(o => o.IsAggregateMethod); } + public IShardingDbContext GetShardingDbContext() + { + return _shardingDbContext; + } + } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs index 07b3b6c5..8f54bbbc 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs @@ -9,6 +9,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Exceptions; using ShardingCore.Extensions; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; @@ -32,120 +33,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines public override TEnsureResult MergeResult() { - if (typeof(decimal) == typeof(TEnsureResult)) - { - var result = base.Execute(queryable => ((IQueryable)queryable).Average()); - if (result.IsEmpty()) - return default; - var average = result.Sum() / result.Count; - return ConvertSum(average); - } - - if (typeof(decimal?) == typeof(TEnsureResult)) - { - var result = base.Execute( - queryable => ((IQueryable)queryable).Average() - ); - if (result.IsEmpty()) - return default; - var average = result.Sum().HasValue ? result.Sum() / result.Count : default; - return ConvertSum(average); - } - - if (typeof(int) == typeof(TEnsureResult)) - { - var result = base.Execute( - queryable => ((IQueryable)queryable).Average() - ); - if (result.IsEmpty()) - return default; - var average = result.Sum() / result.Count; - return ConvertSum(average); - } - - if (typeof(int?) == typeof(TEnsureResult)) - { - var result = base.Execute( - queryable => ((IQueryable)queryable).Average() - ); - if (result.IsEmpty()) - return default; - var average = result.Sum().HasValue ? result.Sum() / result.Count : default; - return ConvertSum(average); - } - - if (typeof(long) == typeof(TEnsureResult)) - { - var result = base.Execute( - queryable => ((IQueryable)queryable).Average() - ); - if (result.IsEmpty()) - return default; - var average = result.Sum() / result.Count; - return ConvertSum(average); - } - - if (typeof(long?) == typeof(TEnsureResult)) - { - var result = base.Execute( - queryable => ((IQueryable)queryable).Average() - ); - if (result.IsEmpty()) - return default; - var average = result.Sum().HasValue ? result.Sum() / result.Count : default; - return ConvertSum(average); - } - - if (typeof(double) == typeof(TEnsureResult)) - { - var result = base.Execute( - queryable => ((IQueryable)queryable).Average() - ); - var average = result.Sum() / result.Count; - return ConvertSum(average); - } - - if (typeof(double?) == typeof(TEnsureResult)) - { - var result = base.Execute( - queryable => ((IQueryable)queryable).Average() - ); - if (result.IsEmpty()) - return default; - var average = result.Sum().HasValue ? result.Sum() / result.Count : default; - return ConvertSum(average); - } - - if (typeof(float) == typeof(TEnsureResult)) - { - var result = base.Execute( - queryable => ((IQueryable)queryable).Average() - ); - if (result.IsEmpty()) - return default; - var average = result.Sum() / result.Count; - return ConvertSum(average); - } - - if (typeof(float?) == typeof(TEnsureResult)) - { - var result = base.Execute( - queryable => ((IQueryable)queryable).Average() - ); - if (result.IsEmpty()) - return default; - var average = result.Sum().HasValue ? result.Sum() / result.Count : default; - return ConvertSum(average); - } - -#if !EFCORE2 - throw new ShardingCoreException( - $"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}"); -#endif -#if EFCORE2 - throw new ShardingCoreException( - $"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}"); -#endif + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync( @@ -158,7 +46,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum() / result.Count; + var average = result.Sum(o=>o.QueryResult) / result.Count; return ConvertSum(average); } @@ -169,7 +57,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + var sum = result.Sum(o => o.QueryResult); + var average = sum.HasValue ? sum / result.Count : default; return ConvertSum(average); } @@ -180,7 +69,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum() / result.Count; + var average = result.Sum(o => o.QueryResult) / result.Count; return ConvertSum(average); } @@ -191,7 +80,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + var sum = result.Sum(o => o.QueryResult); + var average = sum.HasValue ? sum / result.Count : default; return ConvertSum(average); } @@ -202,7 +92,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum() / result.Count; + var average = result.Sum(o => o.QueryResult) / result.Count; return ConvertSum(average); } @@ -213,7 +103,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + var sum = result.Sum(o => o.QueryResult); + var average = sum.HasValue ? sum / result.Count : default; return ConvertSum(average); } @@ -222,7 +113,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).AverageAsync(cancellationToken), cancellationToken); - var average = result.Sum() / result.Count; + var average = result.Sum(o => o.QueryResult) / result.Count; return ConvertSum(average); } @@ -233,7 +124,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + var sum = result.Sum(o => o.QueryResult); + var average = sum.HasValue ? sum / result.Count : default; return ConvertSum(average); } @@ -244,7 +136,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum() / result.Count; + var average = result.Sum(o => o.QueryResult) / result.Count; return ConvertSum(average); } @@ -255,18 +147,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + var sum = result.Sum(o => o.QueryResult); + var average = sum.HasValue ? sum / result.Count : default; return ConvertSum(average); } -#if !EFCORE2 throw new ShardingCoreException( - $"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}"); -#endif -#if EFCORE2 - throw new ShardingCoreException( - $"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}"); -#endif + $"not support {GetMethodCallExpression().ShardingPrint()} result {typeof(TEnsureResult)}"); } private TEnsureResult ConvertSum(TNumber number) diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs index 78777db3..9e004ed5 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs @@ -6,6 +6,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; @@ -27,14 +28,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines public override TResult MergeResult() { - var result = base.Execute( queryable => ((IQueryable)queryable).Max()); - return result.Max(); + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).MaxAsync(cancellationToken), cancellationToken); - return result.Max(); + return result.Max(o=>o.QueryResult); } } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs index c89cf849..cfd843fb 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; @@ -28,14 +29,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines public override TResult MergeResult() { - var result = base.Execute( queryable => ((IQueryable)queryable).Min()); - return result.Min(); + + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).MinAsync(cancellationToken), cancellationToken); - return result.Min(); + return result.Min(o=>o.QueryResult); } } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs index 596ad279..17d0efa6 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs @@ -9,6 +9,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Exceptions; using ShardingCore.Extensions; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; @@ -30,94 +31,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines public override TEnsureResult MergeResult() { - if (typeof(decimal) == typeof(TEnsureResult)) - { - var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); - if (result.IsEmpty()) - return default; - var sum = result.Sum(); - return ConvertSum(sum); - } - if (typeof(decimal?) == typeof(TEnsureResult)) - { - var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); - if (result.IsEmpty()) - return default; - var sum = result.Sum(); - return ConvertSum(sum); - } - if (typeof(int) == typeof(TEnsureResult)) - { - var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); - if (result.IsEmpty()) - return default; - var sum = result.Sum(); - return ConvertSum(sum); - } - if (typeof(int?) == typeof(TEnsureResult)) - { - var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); - if (result.IsEmpty()) - return default; - var sum = result.Sum(); - return ConvertSum(sum); - } - if (typeof(long) == typeof(TEnsureResult)) - { - var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); - if (result.IsEmpty()) - return default; - var sum = result.Sum(); - return ConvertSum(sum); - } - if (typeof(long?) == typeof(TEnsureResult)) - { - var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); - if (result.IsEmpty()) - return default; - var sum = result.Sum(); - return ConvertSum(sum); - } - if (typeof(double) == typeof(TEnsureResult)) - { - var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); - if (result.IsEmpty()) - return default; - var sum = result.Sum(); - return ConvertSum(sum); - } - if (typeof(double?) == typeof(TEnsureResult)) - { - var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); - if (result.IsEmpty()) - return default; - var sum = result.Sum(); - return ConvertSum(sum); - } - if (typeof(float) == typeof(TEnsureResult)) - { - var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); - if (result.IsEmpty()) - return default; - var sum = result.Sum(); - return ConvertSum(sum); - } - if (typeof(float?) == typeof(TEnsureResult)) - { - var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); - if (result.IsEmpty()) - return default; - var sum = result.Sum(); - return ConvertSum(sum); - } -#if !EFCORE2 - throw new ShardingCoreException( - $"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}"); -#endif -#if EFCORE2 - throw new ShardingCoreException( - $"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}"); -#endif + + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) @@ -127,7 +42,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var sum = result.Sum(); + var sum = result.Sum(o=>o.QueryResult); return ConvertSum(sum); } if (typeof(decimal?) == typeof(TEnsureResult)) @@ -135,7 +50,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var sum = result.Sum(); + var sum = result.Sum(o=>o.QueryResult); return ConvertSum(sum); } if (typeof(int) == typeof(TEnsureResult)) @@ -143,7 +58,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var sum = result.Sum(); + var sum = result.Sum(o=>o.QueryResult); return ConvertSum(sum); } if (typeof(int?) == typeof(TEnsureResult)) @@ -151,7 +66,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var sum = result.Sum(); + var sum = result.Sum(o => o.QueryResult); return ConvertSum(sum); } if (typeof(long) == typeof(TEnsureResult)) @@ -159,7 +74,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var sum = result.Sum(); + var sum = result.Sum(o => o.QueryResult); return ConvertSum(sum); } if (typeof(long?) == typeof(TEnsureResult)) @@ -167,7 +82,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var sum = result.Sum(); + var sum = result.Sum(o => o.QueryResult); return ConvertSum(sum); } if (typeof(double) == typeof(TEnsureResult)) @@ -175,7 +90,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var sum = result.Sum(); + var sum = result.Sum(o => o.QueryResult); return ConvertSum(sum); } if (typeof(double?) == typeof(TEnsureResult)) @@ -183,7 +98,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var sum = result.Sum(); + var sum = result.Sum(o => o.QueryResult); return ConvertSum(sum); } if (typeof(float) == typeof(TEnsureResult)) @@ -191,7 +106,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var sum = result.Sum(); + var sum = result.Sum(o => o.QueryResult); return ConvertSum(sum); } if (typeof(float?) == typeof(TEnsureResult)) @@ -199,18 +114,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var sum = result.Sum(); + var sum = result.Sum(o => o.QueryResult); return ConvertSum(sum); } -#if !EFCORE2 throw new ShardingCoreException( - $"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}"); -#endif -#if EFCORE2 - throw new ShardingCoreException( - $"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}"); -#endif + $"not support {GetMethodCallExpression().ShardingPrint()} result {typeof(TEnsureResult)}"); } private TEnsureResult ConvertSum(TNumber number) { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs index f352e024..35ce5b69 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs @@ -38,7 +38,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).AnyAsync(cancellationToken), cancellationToken); - return result.All(o => o); + return result.All(o => o.QueryResult); } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AnyAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AnyAsyncInMemoryMergeEngine.cs index 7205749e..a5b9a224 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AnyAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AnyAsyncInMemoryMergeEngine.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; @@ -29,16 +30,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines public override bool MergeResult() { - var result = base.Execute( queryable => ((IQueryable)queryable).Any()); - - return result.Any(o => o); + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).AnyAsync(cancellationToken), cancellationToken); - return result.Any(o => o); + return result.Any(o => o.QueryResult); } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs index 79d04cf8..8e3e0354 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs @@ -75,7 +75,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines private IAsyncEnumerator GetShardingEnumerator() { - var tableResult = _mergeContext.GetRouteResults(); + var tableResult = _mergeContext.RouteResults; var routeCount = tableResult.Count(); var enumeratorTasks = tableResult.Select(routeResult => { @@ -114,7 +114,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines public IEnumerator GetEnumerator() { - var tableResult = _mergeContext.GetRouteResults(); + var tableResult = _mergeContext.RouteResults; var routeCount = tableResult.Count(); var enumeratorTasks = tableResult.Select(routeResult => { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/ContainsAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/ContainsAsyncInMemoryMergeEngine.cs index ae7fb0ff..2302f388 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/ContainsAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/ContainsAsyncInMemoryMergeEngine.cs @@ -4,6 +4,7 @@ using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; @@ -25,16 +26,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines public override bool MergeResult() { - var result = base.Execute( queryable => ((IQueryable)queryable).Contains(GetConstantItem())); - - return result.Any(o => o); + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).ContainsAsync(GetConstantItem(), cancellationToken), cancellationToken); - return result.Any(o => o); + return result.Any(o => o.QueryResult); } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs index 6e7bc57e..fb63fde3 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; +using ShardingCore.Core.ShardingPage.Abstractions; using ShardingCore.Helpers; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; @@ -18,8 +19,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines */ public class CountAsyncInMemoryMergeEngine : AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine { + private readonly IShardingPageManager _shardingPageManager; public CountAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { + _shardingPageManager = ShardingContainer.GetService(); } public override int MergeResult() @@ -31,6 +34,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).CountAsync(cancellationToken), cancellationToken); + if (_shardingPageManager.Current != null) + { + foreach (var routeQueryResult in result) + { + _shardingPageManager.Current.RouteQueryResults.Add(new RouteQueryResult(routeQueryResult.RouteResult, routeQueryResult.QueryResult)); + } + } return result.Sum(o=>o.QueryResult); } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs new file mode 100644 index 00000000..44b2285f --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs @@ -0,0 +1,60 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; +using ShardingCore.Exceptions; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Enumerators; +using ShardingCore.Sharding.Enumerators.StreamMergeAsync; + +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 15:38:05 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public abstract class AbstractEnumeratorAsyncStreamMergeEngine: AbstractEnumeratorStreamMergeEngine + { + public AbstractEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) + { + } + + public override IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) + { + var dbStreamMergeAsyncEnumerators = GetDbStreamMergeAsyncEnumerators(); + if (dbStreamMergeAsyncEnumerators.IsEmpty()) + throw new ShardingCoreException("GetDbStreamMergeAsyncEnumerators empty"); + return GetStreamMergeAsyncEnumerator(dbStreamMergeAsyncEnumerators); + } + + public abstract IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(); + public abstract IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators); + public async Task> DoGetAsyncEnumerator(IQueryable newQueryable) + { + var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator(); + await enumator.MoveNextAsync(); + return enumator; + } + public virtual IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount) + { + var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); + var useOriginal = routeCount > 1; + DbContextQueryStore.TryAdd(routeResult,shardingDbContext); + var newQueryable = (IQueryable)(useOriginal ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable()) + .ReplaceDbContextQueryable(shardingDbContext); + return newQueryable; + } + + public override IEnumerator GetEnumerator() + { + throw new NotImplementedException(); + } + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs new file mode 100644 index 00000000..945474ef --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs @@ -0,0 +1,52 @@ +using System; +using System.Collections; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using Microsoft.EntityFrameworkCore; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; +using ShardingCore.Extensions; + +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 15:35:39 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public abstract class AbstractEnumeratorStreamMergeEngine:IEnumeratorStreamMergeEngine + { + public StreamMergeContext StreamMergeContext { get; } + public ConcurrentDictionary DbContextQueryStore { get; } + + public AbstractEnumeratorStreamMergeEngine(StreamMergeContext streamMergeContext) + { + StreamMergeContext = streamMergeContext; + DbContextQueryStore = new ConcurrentDictionary(); + } + + public abstract IAsyncEnumerator GetAsyncEnumerator( + CancellationToken cancellationToken = new CancellationToken()); + + public abstract IEnumerator GetEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + public void Dispose() + { + if (DbContextQueryStore.IsNotEmpty()) + { + DbContextQueryStore.Values.ForEach(dbContext => + { + dbContext.Dispose(); + }); + } + } + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorSyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorSyncStreamMergeEngine.cs new file mode 100644 index 00000000..94270b0e --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorSyncStreamMergeEngine.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 15:38:13 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class AbstractEnumeratorSyncStreamMergeEngine : AbstractEnumeratorStreamMergeEngine + { + public AbstractEnumeratorSyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) + { + } + + public override IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) + { + throw new NotImplementedException(); + } + + public override IEnumerator GetEnumerator() + { + throw new NotImplementedException(); + } + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/IEnumeratorStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/IEnumeratorStreamMergeEngine.cs new file mode 100644 index 00000000..e454e26f --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/IEnumeratorStreamMergeEngine.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; + +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 15:15:34 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public interface IEnumeratorStreamMergeEngine : IAsyncEnumerable, IEnumerable, IDisposable + { + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/NormalEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/NormalEnumeratorAsyncStreamMergeEngine.cs new file mode 100644 index 00000000..fa6fa7c5 --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/NormalEnumeratorAsyncStreamMergeEngine.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Enumerators; +using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; + +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 16:16:12 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class NormalEnumeratorAsyncStreamMergeEngine:AbstractEnumeratorAsyncStreamMergeEngine + { + private readonly bool _multiRouteQuery; + public NormalEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) + { + _multiRouteQuery = streamMergeContext.RouteResults.Count() > 1; + } + + public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() + { + var tableResult = StreamMergeContext.RouteResults; + var routeCount = tableResult.Count(); + var enumeratorTasks = tableResult.Select(routeResult => + { + return Task.Run(async () => + { + try + { + var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount); + + var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable); + return new StreamMergeAsyncEnumerator(asyncEnumerator); + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } + }); + }).ToArray(); + + var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult(); + return streamEnumerators; + } + + public override IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount) + { + var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); + DbContextQueryStore.TryAdd(routeResult, shardingDbContext); + var newQueryable = (IQueryable)(_multiRouteQuery ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable()) + .ReplaceDbContextQueryable(shardingDbContext); + return newQueryable; + } + + public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + { + if (_multiRouteQuery && StreamMergeContext.HasSkipTake()) + return new PaginationStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + if (StreamMergeContext.HasGroupQuery()) + return new MultiAggregateOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + return new MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + } + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SequenceEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SequenceEnumeratorAsyncStreamMergeEngine.cs new file mode 100644 index 00000000..209050a8 --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SequenceEnumeratorAsyncStreamMergeEngine.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using ShardingCore.Core.ShardingPage.Abstractions; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; +using ShardingCore.Core.VirtualTables; +using ShardingCore.Exceptions; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Enumerators; +using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; + +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 16:29:06 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class SequenceEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorAsyncStreamMergeEngine + { + private IShardingPageManager _shardingPageManager; + private IVirtualTableManager _virtualTableManager; + public SequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) + { + _shardingPageManager = ShardingContainer.GetService(); + _virtualTableManager = ShardingContainer.GetService(); + } + + public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() + { + var routeQueryResults = _shardingPageManager.Current.RouteQueryResults.ToList(); + if (routeQueryResults.Any(o => o.RouteResult.ReplaceTables.Count(p=>p.IsShardingTable())!=1) + || routeQueryResults.HasDifference(o=>o.RouteResult.ReplaceTables.First().EntityType)) + throw new InvalidOperationException($"error sharding page:[{StreamMergeContext.GetOriginalQueryable().Expression.ShardingPrint()}]"); + var shardingEntityType = routeQueryResults[0].RouteResult.ReplaceTables.FirstOrDefault(o=>o.IsShardingTable()).EntityType; + var virtualTable = _virtualTableManager.GetVirtualTable(StreamMergeContext.GetShardingDbContext().ShardingDbContextType, shardingEntityType); + if (!virtualTable.EnablePagination) + { + throw new ShardingCoreException("not support Sequence enumerator"); + } + + if (base.StreamMergeContext.Orders.IsEmpty()) + { + var append = virtualTable.PaginationMetadata.PaginationConfigs.FirstOrDefault(o=>o.AppendIfOrderNone); + if (append != null) + { + StreamMergeContext.GetOriginalQueryable().OrderBy("") + } + } + + + var tableResult = StreamMergeContext.RouteResults; + var routeCount = tableResult.Count(); + var enumeratorTasks = tableResult.Select(routeResult => + { + return Task.Run(async () => + { + try + { + var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount); + + var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable); + return new StreamMergeAsyncEnumerator(asyncEnumerator); + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } + }); + }).ToArray(); + + var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult(); + return streamEnumerators; + } + + public override IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount) + { + var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); + DbContextQueryStore.TryAdd(routeResult, shardingDbContext); + var newQueryable = (IQueryable)(_multiRouteQuery ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable()) + .ReplaceDbContextQueryable(shardingDbContext); + return newQueryable; + } + + public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + { + if (_multiRouteQuery && StreamMergeContext.HasSkipTake()) + return new PaginationStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + if (StreamMergeContext.HasGroupQuery()) + return new MultiAggregateOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + return new MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + } + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs index a7ecadaa..58c1fe07 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; @@ -28,20 +29,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines public override TResult MergeResult() { - var result = base.Execute(queryable => ((IQueryable)queryable).First()); - var q = result.Where(o => o != null).AsQueryable(); - - var streamMergeContext = GetStreamMergeContext(); - if (streamMergeContext.Orders.Any()) - return q.OrderWithExpression(streamMergeContext.Orders).First(); - - return q.First(); + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).FirstAsync(cancellationToken), cancellationToken); - var q = result.Where(o => o != null).AsQueryable(); + var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); if (streamMergeContext.Orders.Any()) diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs index a2f7947b..0908796f 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs @@ -9,6 +9,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Exceptions; using ShardingCore.Extensions; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; @@ -31,20 +32,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines public override TResult MergeResult() { - var result = base.Execute( queryable => ((IQueryable)queryable).FirstOrDefault()); - var q = result.Where(o => o != null).AsQueryable(); - - var streamMergeContext = GetStreamMergeContext(); - if (streamMergeContext.Orders.Any()) - return q.OrderWithExpression(streamMergeContext.Orders).FirstOrDefault(); - - return q.FirstOrDefault(); + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken); - var q = result.Where(o => o != null).AsQueryable(); + var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); if (streamMergeContext.Orders.Any()) diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs index e0b6e6be..5dc604d1 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; @@ -28,21 +29,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines public override TResult MergeResult() { - var result = base.Execute( queryable => ((IQueryable)queryable).Last()); - var q = result.Where(o => o != null).AsQueryable(); - - var streamMergeContext = GetStreamMergeContext(); - if (streamMergeContext.Orders.Any()) - return q.OrderWithExpression(streamMergeContext.Orders).Last(); - - return q.Last(); + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).LastAsync(cancellationToken), cancellationToken); - var q = result.Where(o => o != null).AsQueryable(); + var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); if (streamMergeContext.Orders.Any()) diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs index 97dcf284..e4daaa00 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; @@ -28,20 +29,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines public override TResult MergeResult() { - var result = base.Execute( queryable => ((IQueryable)queryable).LastOrDefault()); - var q = result.Where(o => o != null).AsQueryable(); - - var streamMergeContext = GetStreamMergeContext(); - if (streamMergeContext.Orders.Any()) - return q.OrderWithExpression(streamMergeContext.Orders).LastOrDefault(); - - return q.LastOrDefault(); + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).LastOrDefaultAsync(cancellationToken), cancellationToken); - var q = result.Where(o => o != null).AsQueryable(); + var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); if (streamMergeContext.Orders.Any()) diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/LongCountAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/LongCountAsyncInMemoryMergeEngine.cs index 15b2b9b1..a02cbe5d 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/LongCountAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/LongCountAsyncInMemoryMergeEngine.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; +using ShardingCore.Core.ShardingPage.Abstractions; using ShardingCore.Exceptions; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; @@ -25,8 +26,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines */ public class LongCountAsyncInMemoryMergeEngine : AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine { + private readonly IShardingPageManager _shardingPageManager; public LongCountAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { + _shardingPageManager= ShardingContainer.GetService(); } public override long MergeResult() @@ -39,6 +42,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).LongCountAsync(cancellationToken), cancellationToken); + if (_shardingPageManager.Current != null) + { + foreach (var routeQueryResult in result) + { + _shardingPageManager.Current.RouteQueryResults.Add(routeQueryResult); + } + } + return result.Sum(o=>o.QueryResult); } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs index 40251fb1..31671927 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; @@ -28,20 +29,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines public override TResult MergeResult() { - var result = base.Execute( queryable => ((IQueryable)queryable).Single()); - var q = result.Where(o => o != null).AsQueryable(); - - var streamMergeContext = GetStreamMergeContext(); - if (streamMergeContext.Orders.Any()) - return q.OrderWithExpression(streamMergeContext.Orders).Single(); - - return q.Single(); + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).SingleAsync(cancellationToken), cancellationToken); - var q = result.Where(o => o != null).AsQueryable(); + var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); if (streamMergeContext.Orders.Any()) diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs index 88c19251..f952b80c 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; @@ -28,20 +29,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines public override TResult MergeResult() { - var result = base.Execute(queryable => ((IQueryable)queryable).SingleOrDefault()); - var q = result.Where(o => o != null).AsQueryable(); - var streamMergeContext = GetStreamMergeContext(); - if (streamMergeContext.Orders.Any()) - return q.OrderWithExpression(streamMergeContext.Orders).SingleOrDefault(); - - return q.SingleOrDefault(); + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).SingleOrDefaultAsync(cancellationToken), cancellationToken); - var q = result.Where(o => o != null).AsQueryable(); + var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); if (streamMergeContext.Orders.Any())