diff --git a/samples/Sample.SqlServer/Controllers/ValuesController.cs b/samples/Sample.SqlServer/Controllers/ValuesController.cs index d6d37b41..518f37e1 100644 --- a/samples/Sample.SqlServer/Controllers/ValuesController.cs +++ b/samples/Sample.SqlServer/Controllers/ValuesController.cs @@ -58,15 +58,15 @@ namespace Sample.SqlServer.Controllers - var sresultx11231 = _defaultTableDbContext.Set().Where(o => o.Age == 198198).Select(o => o.Id).Contains("1981"); - var sresultx1121 = _defaultTableDbContext.Set().Where(o => o.Id == "198").Sum(o => o.Age); - var sresultx111 = _defaultTableDbContext.Set().FirstOrDefault(o => o.Id == "198"); - var sresultx2 = _defaultTableDbContext.Set().Count(o => o.Age <= 10); - var sresultx = _defaultTableDbContext.Set().Where(o => o.Id == "198").FirstOrDefault(); - var sresultx33 = _defaultTableDbContext.Set().Where(o => o.Id == "198").Select(o => o.Id).FirstOrDefault(); - var sresultxc = _defaultTableDbContext.Set().Where(o => o.Id == "198").Select(o => o.Id).ToList(); - var sresultxasdc = _defaultTableDbContext.Set().Where(o => o.Id == "198").ToList(); - var sresult = _defaultTableDbContext.Set().ToList(); + //var sresultx11231 = _defaultTableDbContext.Set().Where(o => o.Age == 198198).Select(o => o.Id).Contains("1981"); + //var sresultx1121 = _defaultTableDbContext.Set().Where(o => o.Id == "198").Sum(o => o.Age); + //var sresultx111 = _defaultTableDbContext.Set().FirstOrDefault(o => o.Id == "198"); + //var sresultx2 = _defaultTableDbContext.Set().Count(o => o.Age <= 10); + //var sresultx = _defaultTableDbContext.Set().Where(o => o.Id == "198").FirstOrDefault(); + //var sresultx33 = _defaultTableDbContext.Set().Where(o => o.Id == "198").Select(o => o.Id).FirstOrDefault(); + //var sresultxc = _defaultTableDbContext.Set().Where(o => o.Id == "198").Select(o => o.Id).ToList(); + //var sresultxasdc = _defaultTableDbContext.Set().Where(o => o.Id == "198").ToList(); + //var sresult = _defaultTableDbContext.Set().ToList(); var sysUserMod98 = result.FirstOrDefault(o => o.Id == "98"); _defaultTableDbContext.Attach(sysUserMod98); @@ -86,5 +86,11 @@ namespace Sample.SqlServer.Controllers } return Ok(); } + [HttpGet] + public async Task Get1([FromQuery] int p,[FromQuery]int s) + { + var shardingPageResultAsync = await _defaultTableDbContext.Set().OrderByDescending(o=>o.DateOfMonth).ToShardingPageAsync(p, s); + return Ok(shardingPageResultAsync); + } } } \ No newline at end of file diff --git a/samples/Sample.SqlServer/DIExtension.cs b/samples/Sample.SqlServer/DIExtension.cs index 7f317646..43d6a1e7 100644 --- a/samples/Sample.SqlServer/DIExtension.cs +++ b/samples/Sample.SqlServer/DIExtension.cs @@ -1,6 +1,8 @@ +using System; using System.Collections.Generic; using System.Linq; using Microsoft.AspNetCore.Builder; +using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Sample.SqlServer.DbContexts; using Sample.SqlServer.Domain.Entities; @@ -25,14 +27,17 @@ namespace Sample.SqlServer public static void DbSeed(this IApplicationBuilder app) { - using (var scope=app.ApplicationServices.CreateScope()) + using (var scope = app.ApplicationServices.CreateScope()) { - var virtualDbContext =scope.ServiceProvider.GetService(); + var virtualDbContext = scope.ServiceProvider.GetService(); if (!virtualDbContext.Set().Any()) { var ids = Enumerable.Range(1, 1000); var userMods = new List(); + var userSalaries = new List(); var SysTests = new List(); + var beginTime = new DateTime(2020, 1, 1); + var endTime = new DateTime(2021, 12, 1); foreach (var id in ids) { userMods.Add(new SysUserMod() @@ -40,16 +45,38 @@ namespace Sample.SqlServer Id = id.ToString(), Age = id, Name = $"name_{id}", + AgeGroup = Math.Abs(id % 10) }); SysTests.Add(new SysTest() { Id = id.ToString(), UserId = id.ToString() }); + var tempTime = beginTime; + var i = 0; + while (tempTime <= endTime) + { + var dateOfMonth = $@"{tempTime:yyyyMM}"; + userSalaries.Add(new SysUserSalary() + { + Id = $@"{id}{dateOfMonth}", + UserId = id.ToString(), + DateOfMonth = int.Parse(dateOfMonth), + Salary = 700000 + id * 100 * i, + SalaryLong = 700000 + id * 100 * i, + SalaryDecimal = (700000 + id * 100 * i) / 100m, + SalaryDouble = (700000 + id * 100 * i) / 100d, + SalaryFloat = (700000 + id * 100 * i) / 100f + }); + tempTime = tempTime.AddMonths(1); + i++; + } } virtualDbContext.AddRange(userMods); virtualDbContext.AddRange(SysTests); + virtualDbContext.AddRange(userSalaries); + virtualDbContext.SaveChanges(); } } diff --git a/samples/Sample.SqlServer/DbContexts/DefaultShardingDbContext.cs b/samples/Sample.SqlServer/DbContexts/DefaultShardingDbContext.cs index 35540c8a..26940a91 100644 --- a/samples/Sample.SqlServer/DbContexts/DefaultShardingDbContext.cs +++ b/samples/Sample.SqlServer/DbContexts/DefaultShardingDbContext.cs @@ -19,6 +19,7 @@ namespace Sample.SqlServer.DbContexts base.OnModelCreating(modelBuilder); modelBuilder.ApplyConfiguration(new SysUserModMap()); modelBuilder.ApplyConfiguration(new SysTestMap()); + modelBuilder.ApplyConfiguration(new SysUserSalaryMap()); } public override Type ShardingDbContextType => this.GetType(); diff --git a/samples/Sample.SqlServer/DbContexts/DefaultTableDbContext.cs b/samples/Sample.SqlServer/DbContexts/DefaultTableDbContext.cs index 81d24db0..d637b1cb 100644 --- a/samples/Sample.SqlServer/DbContexts/DefaultTableDbContext.cs +++ b/samples/Sample.SqlServer/DbContexts/DefaultTableDbContext.cs @@ -17,6 +17,7 @@ namespace Sample.SqlServer.DbContexts base.OnModelCreating(modelBuilder); modelBuilder.ApplyConfiguration(new SysUserModMap()); modelBuilder.ApplyConfiguration(new SysTestMap()); + modelBuilder.ApplyConfiguration(new SysUserSalaryMap()); } public IRouteTail RouteTail { get; set; } diff --git a/samples/Sample.SqlServer/Domain/Entities/SysUserMod.cs b/samples/Sample.SqlServer/Domain/Entities/SysUserMod.cs index 82e65940..3fb0f392 100644 --- a/samples/Sample.SqlServer/Domain/Entities/SysUserMod.cs +++ b/samples/Sample.SqlServer/Domain/Entities/SysUserMod.cs @@ -27,5 +27,6 @@ namespace Sample.SqlServer.Domain.Entities /// 用户姓名 /// public int Age { get; set; } + public int AgeGroup { get; set; } } } \ No newline at end of file diff --git a/samples/Sample.SqlServer/Domain/Entities/SysUserSalary.cs b/samples/Sample.SqlServer/Domain/Entities/SysUserSalary.cs new file mode 100644 index 00000000..ed48f076 --- /dev/null +++ b/samples/Sample.SqlServer/Domain/Entities/SysUserSalary.cs @@ -0,0 +1,42 @@ +using ShardingCore.Core; + +namespace Sample.SqlServer.Domain.Entities +{ +/* +* @Author: xjm +* @Description: +* @Date: Monday, 01 February 2021 15:43:22 +* @Email: 326308290@qq.com +*/ + public class SysUserSalary:IShardingTable + { + public string Id { get; set; } + public string UserId { get; set; } + /// + /// 每月的金额 + /// + [ShardingTableKey] + public int DateOfMonth { get; set; } + /// + /// 工资 + /// + public int Salary { get; set; } + /// + /// 工资 + /// + public long SalaryLong { get; set; } + + /// + /// 工资 + /// + public decimal SalaryDecimal { get; set; } + /// + /// 工资 + /// + public double SalaryDouble { get; set; } + /// + /// 工资 + /// + public float SalaryFloat { get; set; } + } +} \ No newline at end of file diff --git a/samples/Sample.SqlServer/Domain/Maps/SysUserSalaryMap.cs b/samples/Sample.SqlServer/Domain/Maps/SysUserSalaryMap.cs new file mode 100644 index 00000000..287c3975 --- /dev/null +++ b/samples/Sample.SqlServer/Domain/Maps/SysUserSalaryMap.cs @@ -0,0 +1,23 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; +using Sample.SqlServer.Domain.Entities; + +namespace Sample.SqlServer.Domain.Maps +{ +/* +* @Author: xjm +* @Description: +* @Date: Monday, 01 February 2021 15:42:35 +* @Email: 326308290@qq.com +*/ + public class SysUserSalaryMap:IEntityTypeConfiguration + { + public void Configure(EntityTypeBuilder builder) + { + builder.HasKey(o => o.Id); + builder.Property(o => o.Id).IsRequired().HasMaxLength(128); + builder.Property(o => o.UserId).IsRequired().HasMaxLength(128); + builder.ToTable(nameof(SysUserSalary)); + } + } +} \ No newline at end of file diff --git a/samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs b/samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs index 74ae3355..1c4ad58b 100644 --- a/samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs +++ b/samples/Sample.SqlServer/Shardings/SysUserModPaginationConfiguration.cs @@ -12,7 +12,7 @@ namespace Sample.SqlServer.Shardings public void Configure(PaginationBuilder builder) { builder.PaginationSequence(o => o.Id) - .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.FirstMatch); + .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch); } } } diff --git a/samples/Sample.SqlServer/Shardings/SysUserSalaryPaginationConfiguration.cs b/samples/Sample.SqlServer/Shardings/SysUserSalaryPaginationConfiguration.cs new file mode 100644 index 00000000..b215a245 --- /dev/null +++ b/samples/Sample.SqlServer/Shardings/SysUserSalaryPaginationConfiguration.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Sample.SqlServer.Domain.Entities; +using ShardingCore.Sharding.PaginationConfigurations; + +namespace Sample.SqlServer.Shardings +{ + public class SysUserSalaryPaginationConfiguration:IPaginationConfiguration + { + public void Configure(PaginationBuilder builder) + { + //builder.PaginationSequence(o => o.Id) + // .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch); + //builder.PaginationSequence(o => o.DateOfMonth) + // .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch).UseAppendIfOrderNone(); + builder.ConfigReverseShardingPage(); + } + } +} diff --git a/samples/Sample.SqlServer/Shardings/SysUserSalaryVirtualTableRoute.cs b/samples/Sample.SqlServer/Shardings/SysUserSalaryVirtualTableRoute.cs new file mode 100644 index 00000000..d850c45e --- /dev/null +++ b/samples/Sample.SqlServer/Shardings/SysUserSalaryVirtualTableRoute.cs @@ -0,0 +1,80 @@ +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using Sample.SqlServer.Domain.Entities; +using ShardingCore.Core.VirtualRoutes; +using ShardingCore.Core.VirtualRoutes.TableRoutes.Abstractions; +using ShardingCore.Sharding.PaginationConfigurations; + +namespace Sample.SqlServer.Shardings +{ +/* +* @Author: xjm +* @Description: +* @Date: Monday, 01 February 2021 15:54:55 +* @Email: 326308290@qq.com +*/ + public class SysUserSalaryVirtualTableRoute:AbstractShardingOperatorVirtualTableRoute + { + protected override int ConvertToShardingKey(object shardingKey) + { + return Convert.ToInt32(shardingKey); + } + + public override string ShardingKeyToTail(object shardingKey) + { + var time = ConvertToShardingKey(shardingKey); + return TimeFormatToTail(time); + } + + + public override List GetAllTails() + { + var beginTime = new DateTime(2020, 1, 1); + var endTime = new DateTime(2021, 12, 1); + var list = new List(24); + var tempTime = beginTime; + while (tempTime <= endTime) + { + list.Add($"{tempTime:yyyyMM}"); + tempTime = tempTime.AddMonths(1); + } + + return list; + } + + protected string TimeFormatToTail(int time) + { + var dateOfMonth=DateTime.ParseExact($"{time}","yyyyMM",System.Globalization.CultureInfo.InvariantCulture,System.Globalization.DateTimeStyles.AdjustToUniversal); + return $"{dateOfMonth:yyyyMM}"; + } + + protected override Expression> GetRouteToFilter(int shardingKey, ShardingOperatorEnum shardingOperator) + { + var t = TimeFormatToTail(shardingKey); + switch (shardingOperator) + { + case ShardingOperatorEnum.GreaterThan: + case ShardingOperatorEnum.GreaterThanOrEqual: + return tail => String.Compare(tail, t, StringComparison.Ordinal) >= 0; + case ShardingOperatorEnum.LessThan: + return tail => String.Compare(tail, t, StringComparison.Ordinal) < 0; + case ShardingOperatorEnum.LessThanOrEqual: + return tail => String.Compare(tail, t, StringComparison.Ordinal) <= 0; + case ShardingOperatorEnum.Equal: return tail => tail == t; + default: + { +#if DEBUG + Console.WriteLine($"shardingOperator is not equal scan all table tail"); +#endif + return tail => true; + } + } + } + + public override IPaginationConfiguration CreatePaginationConfiguration() + { + return new SysUserSalaryPaginationConfiguration(); + } + } +} \ No newline at end of file diff --git a/samples/Sample.SqlServer/Startup.cs b/samples/Sample.SqlServer/Startup.cs index bfe84e78..33791e3c 100644 --- a/samples/Sample.SqlServer/Startup.cs +++ b/samples/Sample.SqlServer/Startup.cs @@ -38,6 +38,7 @@ namespace Sample.SqlServer //.ReplaceService()//支持sqlserver2008r2 );//使用链接字符串创建dbcontext op.AddShardingTableRoute(); + op.AddShardingTableRoute(); }); ////不支持MARS不支持追踪的 diff --git a/src/ShardingCore/Extensions/IShardingQueryableExtension.cs b/src/ShardingCore/Extensions/IShardingQueryableExtension.cs index a7d9976b..368fd178 100644 --- a/src/ShardingCore/Extensions/IShardingQueryableExtension.cs +++ b/src/ShardingCore/Extensions/IShardingQueryableExtension.cs @@ -1,6 +1,7 @@ using System.Linq; using Microsoft.EntityFrameworkCore; using ShardingCore.Core.Internal.Visitors; +using ShardingCore.Sharding.Visitors; namespace ShardingCore.Extensions { @@ -29,7 +30,8 @@ namespace ShardingCore.Extensions /// internal static IQueryable RemoveSkip(this IQueryable source) { - return (IQueryable)source.Provider.CreateQuery(new RemoveSkipVisitor().Visit(source.Expression)); + var expression = new RemoveSkipVisitor().Visit(source.Expression); + return (IQueryable)source.Provider.CreateQuery(expression); } /// @@ -43,6 +45,17 @@ namespace ShardingCore.Extensions var expression = new RemoveTakeVisitor().Visit(source.Expression); return (IQueryable) source.Provider.CreateQuery(expression); } + internal static IQueryable RemoveOrderBy(this IQueryable source) + { + var expression = new RemoveOrderByVisitor().Visit(source.Expression); + return (IQueryable) source.Provider.CreateQuery(expression); + } + + internal static IQueryable RemoveOrderByDescending(this IQueryable source) + { + var expression = new RemoveOrderByDescendingVisitor().Visit(source.Expression); + return (IQueryable) source.Provider.CreateQuery(expression); + } /// /// 切换数据源,保留原数据源中的Expression diff --git a/src/ShardingCore/Extensions/InternalExtensions/InternalLinqExtension.cs b/src/ShardingCore/Extensions/InternalExtensions/InternalLinqExtension.cs new file mode 100644 index 00000000..80e087ce --- /dev/null +++ b/src/ShardingCore/Extensions/InternalExtensions/InternalLinqExtension.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Text; + +namespace ShardingCore.Extensions.InternalExtensions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/3 10:13:07 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + internal static class InternalLinqExtension + { + public static IEnumerable OrderByIf(this IEnumerable source, Func keySelector, bool condition, + IComparer? comparer) + { + return condition ? source.OrderBy(keySelector, comparer) : source; + } + public static IEnumerable OrderByDescendingIf(this IEnumerable source, Func keySelector, bool condition, + IComparer? comparer) + { + return condition ? source.OrderByDescending(keySelector, comparer) : source; + } + } +} diff --git a/src/ShardingCore/Extensions/InternalExtensions/InternalPaginationMetadataExtension.cs b/src/ShardingCore/Extensions/InternalExtensions/InternalPaginationMetadataExtension.cs new file mode 100644 index 00000000..ab7a1da5 --- /dev/null +++ b/src/ShardingCore/Extensions/InternalExtensions/InternalPaginationMetadataExtension.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Text; +using ShardingCore.Sharding.PaginationConfigurations; + +namespace ShardingCore.Extensions.InternalExtensions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/3 13:21:09 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + internal static class InternalPaginationMetadataExtension + { + internal static bool IsUseReverse(this PaginationMetadata paginationMetadata,int skip,long total) + { + if (total < paginationMetadata.ReverseTotalGe) + return false; + + return paginationMetadata.ReverseFactor * total < skip; + } + } +} diff --git a/src/ShardingCore/Extensions/ObjectExtension.cs b/src/ShardingCore/Extensions/ObjectExtension.cs index b004de26..cdb54801 100644 --- a/src/ShardingCore/Extensions/ObjectExtension.cs +++ b/src/ShardingCore/Extensions/ObjectExtension.cs @@ -72,6 +72,17 @@ namespace ShardingCore.Extensions return null; } } + /// + /// 类型X是否包含某个属性 + /// + /// + /// + /// + public static bool ContainPropertyName(this Type type, string propertyName) + { + var property = type.GetProperty(propertyName, _bindingFlags); + return property != null; + } } } \ No newline at end of file diff --git a/src/ShardingCore/Extensions/ShardingQueryableExtension.cs b/src/ShardingCore/Extensions/ShardingQueryableExtension.cs new file mode 100644 index 00000000..46ca5585 --- /dev/null +++ b/src/ShardingCore/Extensions/ShardingQueryableExtension.cs @@ -0,0 +1,60 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using ShardingCore.Core.ShardingPage.Abstractions; + +namespace ShardingCore.Extensions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/3 10:36:51 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public static class ShardingQueryableExtension + { + public static async Task> ToShardingPageAsync(this IQueryable source, int pageIndex, int pageSize) + { + //设置每次获取多少页 + var take = pageSize <= 0 ? 1 : pageSize; + //设置当前页码最小1 + var index = pageIndex <= 0 ? 1 : pageIndex; + //需要跳过多少页 + var skip = (index - 1) * take; + var shardingPageManager = ShardingContainer.GetService(); + using (shardingPageManager.CreateScope()) + { + //获取每次总记录数 + var count = await source.LongCountAsync(); + if (count <= skip) + return new ShardingPagedResult(new List(0), count); + var data = await source.Skip(skip).Take(take).ToListAsync(); + return new ShardingPagedResult(data, count); + } + } + public static ShardingPagedResult ToShardingPage(this IQueryable source, int pageIndex, int pageSize) + { + //设置每次获取多少页 + var take = pageSize <= 0 ? 1 : pageSize; + //设置当前页码最小1 + var index = pageIndex <= 0 ? 1 : pageIndex; + //需要跳过多少页 + var skip = (index - 1) * take; + + var shardingPageManager = ShardingContainer.GetService(); + using (shardingPageManager.CreateScope()) + { + //获取每次总记录数 + var count = source.Count(); + if (count <= skip) + return new ShardingPagedResult(new List(0), count); + var data = source.Skip(skip).Take(take).ToList(); + return new ShardingPagedResult(data, count); + } + } + } +} diff --git a/src/ShardingCore/Extensions/TaskExtension.cs b/src/ShardingCore/Extensions/TaskExtension.cs index cb2bf408..15aa594e 100644 --- a/src/ShardingCore/Extensions/TaskExtension.cs +++ b/src/ShardingCore/Extensions/TaskExtension.cs @@ -134,125 +134,4 @@ namespace ShardingCore.Extensions } } } -} -static class TaskExtension -{ - /// - /// 是否成功 - /// - /// - /// - public static bool IsCompletedSuccessfully(this Task task) - { - return task.IsCompleted && !(task.IsCanceled || task.IsFaulted); - } - - - - /// - /// Waits for the task to complete, unwrapping any exceptions. - /// - /// The task. May not be null. - public static void WaitAndUnwrapException(this Task task) - { - if (task == null) - throw new ArgumentNullException(nameof(task)); - task.GetAwaiter().GetResult(); - } - - /// - /// Waits for the task to complete, unwrapping any exceptions. - /// - /// The task. May not be null. - /// A cancellation token to observe while waiting for the task to complete. - /// The was cancelled before the completed, or the raised an . - public static void WaitAndUnwrapException(this Task task, CancellationToken cancellationToken) - { - if (task == null) - throw new ArgumentNullException(nameof(task)); - try - { - task.Wait(cancellationToken); - } - catch (AggregateException ex) - { - ExceptionDispatchInfo.Capture(ex).Throw(); - throw ex; - } - } - - /// - /// Waits for the task to complete, unwrapping any exceptions. - /// - /// The type of the result of the task. - /// The task. May not be null. - /// The result of the task. - public static TResult WaitAndUnwrapException(this Task task) - { - if (task == null) - throw new ArgumentNullException(nameof(task)); - return task.GetAwaiter().GetResult(); - } - - /// - /// Waits for the task to complete, unwrapping any exceptions. - /// - /// The type of the result of the task. - /// The task. May not be null. - /// A cancellation token to observe while waiting for the task to complete. - /// The result of the task. - /// The was cancelled before the completed, or the raised an . - public static TResult WaitAndUnwrapException(this Task task, CancellationToken cancellationToken) - { - if (task == null) - throw new ArgumentNullException(nameof(task)); - try - { - task.Wait(cancellationToken); - return task.Result; - } - catch (AggregateException ex) - { - ExceptionDispatchInfo.Capture(ex).Throw(); - throw ex; - } - } - - /// - /// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved. - /// - /// The task. May not be null. - public static void WaitWithoutException(this Task task) - { - if (task == null) - throw new ArgumentNullException(nameof(task)); - try - { - task.Wait(); - } - catch (AggregateException) - { - } - } - - /// - /// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved. - /// - /// The task. May not be null. - /// A cancellation token to observe while waiting for the task to complete. - /// The was cancelled before the completed. - public static void WaitWithoutException(this Task task, CancellationToken cancellationToken) - { - if (task == null) - throw new ArgumentNullException(nameof(task)); - try - { - task.Wait(cancellationToken); - } - catch (AggregateException) - { - cancellationToken.ThrowIfCancellationRequested(); - } - } -} -} +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryReverseStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryReverseStreamMergeAsyncEnumerator.cs new file mode 100644 index 00000000..db5bd03d --- /dev/null +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryReverseStreamMergeAsyncEnumerator.cs @@ -0,0 +1,61 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/3 15:30:32 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class InMemoryReverseStreamMergeAsyncEnumerator:IStreamMergeAsyncEnumerator + { + private readonly IStreamMergeAsyncEnumerator _inMemoryStreamMergeAsyncEnumerator; + private bool _first = true; + private IEnumerator _reverseEnumerator = Enumerable.Empty().GetEnumerator(); + public InMemoryReverseStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator inMemoryStreamMergeAsyncEnumerator) + { + _inMemoryStreamMergeAsyncEnumerator = inMemoryStreamMergeAsyncEnumerator; + } + public async ValueTask DisposeAsync() + { + await _inMemoryStreamMergeAsyncEnumerator.DisposeAsync(); + _reverseEnumerator.Dispose(); + } + + public async ValueTask MoveNextAsync() + { + if (_first) + { + ICollection _reverseCollection = new LinkedList(); + while(await _inMemoryStreamMergeAsyncEnumerator.MoveNextAsync()) + { + _reverseCollection.Add(_inMemoryStreamMergeAsyncEnumerator.Current); + } + + _reverseEnumerator = _reverseCollection.Reverse().GetEnumerator(); + _first = false; + } + + return _reverseEnumerator.MoveNext(); + } + + public T Current => _reverseEnumerator.Current; + public bool SkipFirst() + { + throw new NotImplementedException(); + } + + public bool HasElement() + { + throw new NotImplementedException(); + } + + public T ReallyCurrent => Current; + } +} diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationBuilder.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationBuilder.cs index 6859c391..74afc8ed 100644 --- a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationBuilder.cs +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationBuilder.cs @@ -32,13 +32,15 @@ namespace ShardingCore.Sharding.PaginationConfigurations return new PaginationOrderPropertyBuilder(orderPropertyExpression, _metadata); } /// - /// 配置当跳过多少条后开始启用只能分页 + /// 配置反向排序 仅支持单排序 当skip>= reverseTotalGe*reverseFactor使用反向排序 /// - /// + /// + /// /// - public PaginationBuilder ConfigUseShardingPageIfGeSkip(long skip) + public PaginationBuilder ConfigReverseShardingPage(double reverseFactor=0.5,long reverseTotalGe=10000L) { - _metadata.UseShardingPageIfGeSkipAvg = skip; + _metadata.ReverseFactor = reverseFactor; + _metadata.ReverseTotalGe = reverseTotalGe; return this; } /// diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationConfig.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationConfig.cs index 8a4568c5..4f20f7ee 100644 --- a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationConfig.cs +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationConfig.cs @@ -30,10 +30,15 @@ namespace ShardingCore.Sharding.PaginationConfigurations public IComparer TailComparer { get; set; } public PaginationMatchEnum PaginationMatchEnum { get; set; } public PropertyInfo OrderPropertyInfo { get; set; } + /// /// 如果查询没发现排序就将当前配置追加上去 /// - public bool AppendIfOrderNone { get; set; } + public bool AppendIfOrderNone => AppendOrder >= 0; + /// + /// 大于等于0表示需要 + /// + public int AppendOrder { get; set; } = -1; public string PropertyName { get;} diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMatchEnum.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMatchEnum.cs index fbea942e..44f614e0 100644 --- a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMatchEnum.cs +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMatchEnum.cs @@ -22,6 +22,6 @@ namespace ShardingCore.Sharding.PaginationConfigurations /// /// һƥͿ /// - FirstMatch = 1 << 2 + PrimaryMatch = 1 << 2 } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs index f8968190..1f9bf7d3 100644 --- a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationMetadata.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Text; namespace ShardingCore.Sharding.PaginationConfigurations @@ -17,10 +18,20 @@ namespace ShardingCore.Sharding.PaginationConfigurations public class PaginationMetadata { public ISet PaginationConfigs = new HashSet(); + /// - /// 配置生效当跳过多少条后 GREATER THAN OR EQUAL + /// 反向排序因子 /// - public long UseShardingPageIfGeSkipAvg { get; set; } = 3000L; + public double ReverseFactor { get; set; } = -1; + + /// + /// 当条数大于多少条后采用反向排序 + /// + public long ReverseTotalGe { get; set; } = 10000L; + /// + /// 是否已开启反向排序 仅支持单排序 + /// + public bool EnableReverseShardingPage => ReverseFactor > 0 && ReverseFactor < 1 && ReverseTotalGe >= 1000; /// /// 分表发现如果少于多少条后直接取到内存 LESS THAN OR EQUAL /// diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationOrderPropertyBuilder.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationOrderPropertyBuilder.cs index 362c89f2..784fae46 100644 --- a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationOrderPropertyBuilder.cs +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationOrderPropertyBuilder.cs @@ -44,10 +44,11 @@ namespace ShardingCore.Sharding.PaginationConfigurations /// /// 如果查询没发现排序就将当前配置追加上去 /// + /// 大于等于0生效,越大优先级越高 /// - public PaginationOrderPropertyBuilder UseAppendIfOrderNone() + public PaginationOrderPropertyBuilder UseAppendIfOrderNone(int order=0) { - _paginationConfig.AppendIfOrderNone = true; + _paginationConfig.AppendOrder = order; return this; } } diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/AbstractShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/AbstractShardingQueryExecutor.cs deleted file mode 100644 index 5718836e..00000000 --- a/src/ShardingCore/Sharding/ShardingQueryExecutors/AbstractShardingQueryExecutor.cs +++ /dev/null @@ -1,36 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq.Expressions; -using System.Text; -using ShardingCore.Sharding.Abstractions; - -namespace ShardingCore.Sharding.ShardingQueryExecutors -{ - /* - * @Author: xjm - * @Description: - * @Date: 2021/8/30 17:11:40 - * @Ver: 1.0 - * @Email: 326308290@qq.com - */ - public class AbstractShardingQueryExecutor: IShardingQueryExecutor - { - private readonly MethodCallExpression _expression; - private readonly IShardingDbContext _shardingDbContext; - - public AbstractShardingQueryExecutor(MethodCallExpression expression,IShardingDbContext shardingDbContext) - { - _expression = expression; - _shardingDbContext = shardingDbContext; - } - public MethodCallExpression GetQueryExpression() - { - return _expression; - } - - public IShardingDbContext GetCurrentShardingDbContext() - { - return _shardingDbContext; - } - } -} diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs index 3ede8cfd..46468d6d 100644 --- a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs +++ b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs @@ -4,9 +4,11 @@ using System.Linq; using System.Linq.Expressions; using System.Text; using System.Threading; +using ShardingCore.Core.Internal.Visitors; using ShardingCore.Core.ShardingPage.Abstractions; using ShardingCore.Core.VirtualTables; using ShardingCore.Extensions; +using ShardingCore.Extensions.InternalExtensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.PaginationConfigurations; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines; @@ -41,33 +43,76 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors return new SingleQueryEnumeratorAsyncStreamMergeEngine(_streamMergeContext); } //未开启系统分表或者本次查询涉及多张分表 - if (!_streamMergeContext.IsPaginationQuery()||!_streamMergeContext.IsSingleShardingTableQuery()||_shardingPageManager.Current == null) + if (_streamMergeContext.IsPaginationQuery()&&_streamMergeContext.IsSingleShardingTableQuery()&&_shardingPageManager.Current != null) { - return new DefaultShardingEnumeratorAsyncStreamMergeEngine(_streamMergeContext); - } - var shardingEntityType = _streamMergeContext.RouteResults.First().ReplaceTables.Single(o=>o.IsShardingTable()).EntityType; - var virtualTable = _virtualTableManager.GetVirtualTable(_streamMergeContext.GetShardingDbContext().ShardingDbContextType,shardingEntityType); - if (!virtualTable.EnablePagination) - return new DefaultShardingEnumeratorAsyncStreamMergeEngine(_streamMergeContext); - if (_streamMergeContext.Orders.IsEmpty()) - { - var append = virtualTable.PaginationMetadata.PaginationConfigs.FirstOrDefault(o=>o.AppendIfOrderNone); - if (append != null) + //获取虚拟表判断是否启用了分页配置 + var shardingEntityType = _streamMergeContext.RouteResults.First().ReplaceTables.First().EntityType; + var virtualTable = _virtualTableManager.GetVirtualTable(_streamMergeContext.GetShardingDbContext().ShardingDbContextType, shardingEntityType); + if (virtualTable.EnablePagination) { - 123 - return new SequenceEnumeratorAsyncStreamMergeEngine(_streamMergeContext); + var paginationMetadata = virtualTable.PaginationMetadata; + //判断本次查询的排序是否包含order,如果不包含就获取默认添加的排序 + if (_streamMergeContext.Orders.IsEmpty()) + { + //除了判断属性名还要判断所属关系 + var appendPaginationConfig = paginationMetadata.PaginationConfigs.OrderByDescending(o=>o.AppendOrder) + .FirstOrDefault(o => o.AppendIfOrderNone&&typeof(TEntity).ContainPropertyName(o.PropertyName)&& PaginationMatch(o)); + if (appendPaginationConfig != null) + { + return new AppenOrderSequenceEnumeratorAsyncStreamMergeEngine(_streamMergeContext, appendPaginationConfig, _shardingPageManager.Current.RouteQueryResults); + } + } + else + { + var primaryOrder = _streamMergeContext.Orders.First(); + var appendPaginationConfig = paginationMetadata.PaginationConfigs.FirstOrDefault(o => PaginationMatch(o,primaryOrder)); + if (appendPaginationConfig != null) + { + return new SequenceEnumeratorAsyncStreamMergeEngine(_streamMergeContext, appendPaginationConfig, _shardingPageManager.Current.RouteQueryResults, primaryOrder.IsAsc); + } + if (_streamMergeContext.Orders.Count() == 1) + { + //skip过大reserve skip + if (paginationMetadata.EnableReverseShardingPage &&_streamMergeContext.Take.GetValueOrDefault()>0) + { + var total = _shardingPageManager.Current.RouteQueryResults.Sum(o => o.QueryResult); + if (paginationMetadata.IsUseReverse(_streamMergeContext.Skip.GetValueOrDefault(), total)) + { + return new ReverseShardingEnumeratorAsyncStreamMergeEngine( + _streamMergeContext, primaryOrder, total); + } + } + } + } } } - var propertyOrder = _streamMergeContext.Orders.First(); - //PaginationMatchEnum.Owner - 111 - var paginationConfig = virtualTable.PaginationMetadata.PaginationConfigs.FirstOrDefault(o=>o.PropertyName==propertyOrder.PropertyExpression); - if (paginationConfig==null) - return new DefaultShardingEnumeratorAsyncStreamMergeEngine(_streamMergeContext); - //调用顺序排序 - paginationConfig - + + return new DefaultShardingEnumeratorAsyncStreamMergeEngine(_streamMergeContext); + } + + private bool PaginationMatch(PaginationConfig paginationConfig) + { + if (paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Owner)&& !paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Named)) + return typeof(TEntity) == paginationConfig.OrderPropertyInfo.DeclaringType; + + return false; + } + private bool PaginationMatch(PaginationConfig paginationConfig,PropertyOrder propertyOrder) + { + if (paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.PrimaryMatch)) + { + if (!propertyOrder.PropertyExpression.StartsWith(paginationConfig.PropertyName)) + return false; + } + + if (propertyOrder.PropertyExpression != paginationConfig.PropertyName) + return false; + + if (paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Owner)&&!paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Named)) + return typeof(TEntity) == paginationConfig.OrderPropertyInfo.DeclaringType; + + return false; } } diff --git a/src/ShardingCore/Sharding/StreamMergeContext.cs b/src/ShardingCore/Sharding/StreamMergeContext.cs index f3c6ec5f..a3343204 100644 --- a/src/ShardingCore/Sharding/StreamMergeContext.cs +++ b/src/ShardingCore/Sharding/StreamMergeContext.cs @@ -31,7 +31,7 @@ namespace ShardingCore.Sharding //public DataSourceRoutingResult RoutingResult { get; } public int? Skip { get;} public int? Take { get; } - public IEnumerable Orders { get;} + public IEnumerable Orders { get; private set; } public SelectContext SelectContext { get;} public GroupByContext GroupByContext { get; } @@ -68,7 +68,10 @@ namespace ShardingCore.Sharding // GroupByContext = reWriteResult.GroupByContext; // _reWriteSource = reWriteResult.ReWriteQueryable; //} - + public void ReSetOrders(IEnumerable orders) + { + Orders = orders; + } public DbContext CreateDbContext(RouteResult routeResult) { var routeTail = _routeTailFactory.Create(routeResult); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs index da687eae..931b93b5 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs @@ -106,7 +106,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions public List Execute(Func efQuery, CancellationToken cancellationToken = new CancellationToken()) { - var tableResult = _mergeContext.GetRouteResults(); + var tableResult = _mergeContext.RouteResults; var enumeratorTasks = tableResult.Select(routeResult => { return Task.Run(() => diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs index 8e3e0354..9e4924a0 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs @@ -11,6 +11,7 @@ using ShardingCore.Extensions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; using ShardingCore.Sharding.Enumerators.StreamMergeSync; +using ShardingCore.Sharding.ShardingQueryExecutors; #if EFCORE2 using Microsoft.EntityFrameworkCore.Extensions.Internal; @@ -24,15 +25,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines * @Date: Saturday, 14 August 2021 22:07:28 * @Email: 326308290@qq.com */ - public class AsyncEnumerableStreamMergeEngine : IAsyncEnumerable, IEnumerable, IDisposable + public class AsyncEnumerableStreamMergeEngine : IAsyncEnumerable, IEnumerable { private readonly StreamMergeContext _mergeContext; - private readonly ICollection _parllelDbbContexts; public AsyncEnumerableStreamMergeEngine(StreamMergeContext mergeContext) { _mergeContext = mergeContext; - _parllelDbbContexts = new LinkedList(); } @@ -46,7 +45,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines } public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) { - return GetShardingEnumerator(); + return new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync(cancellationToken) + .GetAsyncEnumerator(cancellationToken); } #endif @@ -62,47 +62,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines return GetShardingEnumerator(); } #endif - - private IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult,int routeCount) - { - var shardingDbContext = _mergeContext.CreateDbContext(routeResult); - var useOriginal = routeCount>1; - _parllelDbbContexts.Add(shardingDbContext); - var newQueryable = (IQueryable)(useOriginal?_mergeContext.GetReWriteQueryable():_mergeContext.GetOriginalQueryable()) - .ReplaceDbContextQueryable(shardingDbContext); - return newQueryable; - } - - private IAsyncEnumerator GetShardingEnumerator() - { - var tableResult = _mergeContext.RouteResults; - var routeCount = tableResult.Count(); - var enumeratorTasks = tableResult.Select(routeResult => - { - return Task.Run(async () => - { - try - { - var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount); - - var asyncEnumerator = await GetAsyncEnumerator(newQueryable); - return new StreamMergeAsyncEnumerator(asyncEnumerator); - } - catch (Exception e) - { - Console.WriteLine(e); - throw; - } - }); - }).ToArray(); - - var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult(); - if (routeCount>1&&_mergeContext.HasSkipTake()) - return new PaginationStreamMergeAsyncEnumerator(_mergeContext, streamEnumerators); - if (_mergeContext.HasGroupQuery()) - return new MultiAggregateOrderStreamMergeAsyncEnumerator(_mergeContext, streamEnumerators); - return new MultiOrderStreamMergeAsyncEnumerator(_mergeContext, streamEnumerators); - } + private IEnumerator GetEnumerator(IQueryable newQueryable) @@ -114,33 +74,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines public IEnumerator GetEnumerator() { - var tableResult = _mergeContext.RouteResults; - var routeCount = tableResult.Count(); - var enumeratorTasks = tableResult.Select(routeResult => - { - return Task.Run(() => - { - try - { - var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount); - var enumerator = GetEnumerator(newQueryable); - return new StreamMergeEnumerator(enumerator); - } - catch (Exception e) - { - Console.WriteLine(e); - throw; - } - }); - }).ToArray(); - - var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult(); - if (routeCount > 1 && _mergeContext.HasSkipTake()) - return new PaginationStreamMergeEnumerator(_mergeContext, streamEnumerators); - if (_mergeContext.HasGroupQuery()) - return new MultiAggregateOrderStreamMergeEnumerator(_mergeContext, streamEnumerators); - return new MultiOrderStreamMergeEnumerator(_mergeContext, streamEnumerators); + return new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync() + .GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() @@ -148,22 +84,5 @@ namespace ShardingCore.Sharding.StreamMergeEngines return GetEnumerator(); } - public void Dispose() - { - if (_parllelDbbContexts.IsNotEmpty()) - { - _parllelDbbContexts.ForEach(o => - { - try - { - o.Dispose(); - } - catch (Exception e) - { - Console.WriteLine(e); - } - }); - } - } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs new file mode 100644 index 00000000..bc587bb0 --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using ShardingCore.Core.Internal.Visitors; +using ShardingCore.Core.ShardingPage.Abstractions; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; +using ShardingCore.Core.VirtualTables; +using ShardingCore.Exceptions; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Enumerators; +using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.PaginationConfigurations; +using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; +using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base; + +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/3 8:09:18 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class AppenOrderSequenceEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorAsyncStreamMergeEngine + { + private readonly PaginationConfig _appendPaginationConfig; + private readonly ICollection> _routeQueryResults; + private IShardingPageManager _shardingPageManager; + private IVirtualTableManager _virtualTableManager; + public AppenOrderSequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext, PaginationConfig appendPaginationConfig, ICollection> routeQueryResults) : base(streamMergeContext) + { + _appendPaginationConfig = appendPaginationConfig; + _routeQueryResults = routeQueryResults; + _shardingPageManager = ShardingContainer.GetService(); + _virtualTableManager = ShardingContainer.GetService(); + } + + public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() + { + var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake(); + var skip = StreamMergeContext.Skip.GetValueOrDefault(); + if (skip < 0) + throw new ShardingCoreException("skip must ge 0"); + + var take = StreamMergeContext.Take; + if (take.HasValue&&take.Value <= 0) + throw new ShardingCoreException("take must gt 0"); + + var sortRouteResults = _routeQueryResults.Select(o => new + { + Tail = o.RouteResult.ReplaceTables.First().Tail, + RouteQueryResult = o + }).OrderBy(o => o.Tail, _appendPaginationConfig.TailComparer).ToList(); + var skipCount = skip; + + var sequenceResults = new SequencePaginationList(sortRouteResults.Select(o=>o.RouteQueryResult)).Skip(skip).Take(take).ToList(); + + StreamMergeContext.ReSetOrders(new PropertyOrder[] { new PropertyOrder(_appendPaginationConfig.PropertyName, true) }); + var enumeratorTasks = sequenceResults.Select(sequenceResult => + { + var newQueryable = CreateAsyncExecuteQueryable(noPaginationQueryable, sequenceResult); + return Task.Run(async () => + { + try + { + var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable); + return new StreamMergeAsyncEnumerator(asyncEnumerator); + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } + }); + }).ToArray(); + + var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException(); + return streamEnumerators; + } + + private IQueryable CreateAsyncExecuteQueryable(IQueryable noPaginationQueryable, SequenceResult sequenceResult) + { + var shardingDbContext = StreamMergeContext.CreateDbContext(sequenceResult.RouteResult); + DbContextQueryStore.TryAdd(sequenceResult.RouteResult, shardingDbContext); + var newQueryable = (IQueryable)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take).OrderWithExpression(new PropertyOrder[]{new PropertyOrder(_appendPaginationConfig.PropertyName,true)})) + .ReplaceDbContextQueryable(shardingDbContext); + return newQueryable; + } + + public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + { + if (StreamMergeContext.HasGroupQuery()) + return new MultiAggregateOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + return new MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + } + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Base/SequencePaginationList.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Base/SequencePaginationList.cs new file mode 100644 index 00000000..61ade0b0 --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Base/SequencePaginationList.cs @@ -0,0 +1,101 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; + +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/3 8:31:20 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class SequencePaginationList + { + private readonly IEnumerable> _routeQueryResults; + private long? _skip; + private long? _take; + + public SequencePaginationList(IEnumerable> routeQueryResults) + { + _routeQueryResults = routeQueryResults; + } + public SequencePaginationList Skip(long? skip) + { + _skip = skip; + return this; + } + public SequencePaginationList Take(long? take) + { + _take = take; + return this; + } + + public ICollection ToList() + { + ICollection routeResults = new LinkedList(); + + var currentSkip = _skip.GetValueOrDefault(); + var currentTake = _take; + bool stopSkip = false; + bool needBreak = false; + foreach (var routeQueryResult in _routeQueryResults) + { + if (!stopSkip) + { + if (routeQueryResult.QueryResult >= currentSkip) + { + stopSkip = true; + } + else + { + currentSkip = currentSkip - routeQueryResult.QueryResult; + continue; + } + } + + var currentRealSkip = currentSkip; + var currentRealTake = routeQueryResult.QueryResult-currentSkip; + if (currentSkip != 0) + currentSkip = 0; + if (currentTake.HasValue) + { + if (currentTake.Value <= currentRealTake) + { + currentRealTake = currentTake.Value; + needBreak = true; + } + else + { + currentRealTake = currentTake.Value-currentRealTake; + } + } + var sequenceResult = new SequenceResult(currentRealSkip, currentRealTake, routeQueryResult.RouteResult); + routeResults.Add(sequenceResult); + + if (needBreak) + break; + + } + + return routeResults; + } + } + public class SequenceResult + { + public SequenceResult(long skip, long take, RouteResult routeResult) + { + Skip = (int)skip; + Take = (int)take; + RouteResult = routeResult; + } + + public int Skip { get; } + public int Take { get; } + + public RouteResult RouteResult { get; } + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs new file mode 100644 index 00000000..bb5ae50f --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs @@ -0,0 +1,91 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using ShardingCore.Core.Internal.Visitors; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Enumerators; +using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; + +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/3 13:32:12 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class ReverseShardingEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorAsyncStreamMergeEngine + { + private readonly PropertyOrder _primaryOrder; + private readonly long _total; + + public ReverseShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext, PropertyOrder primaryOrder, long total) : base(streamMergeContext) + { + _primaryOrder = primaryOrder; + _total = total; + } + + public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() + { + + var noPaginationNoOrderQueryable = _primaryOrder.IsAsc ? StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveOrderBy(): StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveOrderByDescending(); + var skip = StreamMergeContext.Skip.GetValueOrDefault(); + var take = StreamMergeContext.Take.GetValueOrDefault(); + var realSkip = _total- take- skip; + var tableResult = StreamMergeContext.RouteResults; + var reverseOrderQueryable = noPaginationNoOrderQueryable.Skip((int)realSkip).Take((int)realSkip+take).OrderWithExpression(new List() + { + new PropertyOrder( _primaryOrder.PropertyExpression,!_primaryOrder.IsAsc) + }); + var enumeratorTasks = tableResult.Select(routeResult => + { + var newQueryable = CreateAsyncExecuteQueryable(reverseOrderQueryable,routeResult); + return Task.Run(async () => + { + try + { + var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable); + return new StreamMergeAsyncEnumerator(asyncEnumerator); + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } + }); + }).ToArray(); + + var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException(); + return streamEnumerators; + } + + private IQueryable CreateAsyncExecuteQueryable(IQueryable reverseOrderQueryable, RouteResult routeResult) + { + var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); + DbContextQueryStore.TryAdd(routeResult, shardingDbContext); + var newQueryable = (IQueryable)reverseOrderQueryable + .ReplaceDbContextQueryable(shardingDbContext); + return newQueryable; + } + + public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + { + var doGetStreamMergeAsyncEnumerator = DoGetStreamMergeAsyncEnumerator(streamsAsyncEnumerators); + return new InMemoryReverseStreamMergeAsyncEnumerator(doGetStreamMergeAsyncEnumerator); + } + + private IStreamMergeAsyncEnumerator DoGetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + { + if (StreamMergeContext.IsPaginationQuery()) + return new PaginationStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + if (StreamMergeContext.HasGroupQuery()) + return new MultiAggregateOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + return new MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + } + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SequenceEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SequenceEnumeratorAsyncStreamMergeEngine.cs index 209050a8..cc3d3f60 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SequenceEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SequenceEnumeratorAsyncStreamMergeEngine.cs @@ -3,14 +3,18 @@ using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; +using ShardingCore.Core.Internal.Visitors; using ShardingCore.Core.ShardingPage.Abstractions; using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; using ShardingCore.Core.VirtualTables; using ShardingCore.Exceptions; using ShardingCore.Extensions; +using ShardingCore.Extensions.InternalExtensions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.PaginationConfigurations; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; +using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base; namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines { @@ -23,47 +27,43 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines */ public class SequenceEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorAsyncStreamMergeEngine { - private IShardingPageManager _shardingPageManager; - private IVirtualTableManager _virtualTableManager; - public SequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) + private readonly PaginationConfig _orderPaginationConfig; + private readonly ICollection> _routeQueryResults; + private readonly bool _isAsc; + public SequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext, PaginationConfig orderPaginationConfig, ICollection> routeQueryResults, bool isAsc) : base(streamMergeContext) { - _shardingPageManager = ShardingContainer.GetService(); - _virtualTableManager = ShardingContainer.GetService(); + _orderPaginationConfig = orderPaginationConfig; + _routeQueryResults = routeQueryResults; + _isAsc = isAsc; } public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() { - var routeQueryResults = _shardingPageManager.Current.RouteQueryResults.ToList(); - if (routeQueryResults.Any(o => o.RouteResult.ReplaceTables.Count(p=>p.IsShardingTable())!=1) - || routeQueryResults.HasDifference(o=>o.RouteResult.ReplaceTables.First().EntityType)) - throw new InvalidOperationException($"error sharding page:[{StreamMergeContext.GetOriginalQueryable().Expression.ShardingPrint()}]"); - var shardingEntityType = routeQueryResults[0].RouteResult.ReplaceTables.FirstOrDefault(o=>o.IsShardingTable()).EntityType; - var virtualTable = _virtualTableManager.GetVirtualTable(StreamMergeContext.GetShardingDbContext().ShardingDbContextType, shardingEntityType); - if (!virtualTable.EnablePagination) - { - throw new ShardingCoreException("not support Sequence enumerator"); - } + var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake(); + var skip = StreamMergeContext.Skip.GetValueOrDefault(); + if (skip < 0) + throw new ShardingCoreException("skip must ge 0"); - if (base.StreamMergeContext.Orders.IsEmpty()) - { - var append = virtualTable.PaginationMetadata.PaginationConfigs.FirstOrDefault(o=>o.AppendIfOrderNone); - if (append != null) - { - StreamMergeContext.GetOriginalQueryable().OrderBy("") - } - } + var take = StreamMergeContext.Take; + if (take.HasValue && take.Value <= 0) + throw new ShardingCoreException("take must gt 0"); - - var tableResult = StreamMergeContext.RouteResults; - var routeCount = tableResult.Count(); - var enumeratorTasks = tableResult.Select(routeResult => + var sortRouteResults = _routeQueryResults.Select(o => new { + Tail = o.RouteResult.ReplaceTables.First().Tail, + RouteQueryResult = o + }).OrderByIf(o => o.Tail, _isAsc, _orderPaginationConfig.TailComparer) + .OrderByDescendingIf(o => o.Tail, !_isAsc, _orderPaginationConfig.TailComparer).ToList(); + + var sequenceResults = new SequencePaginationList(sortRouteResults.Select(o => o.RouteQueryResult)).Skip(skip).Take(take).ToList(); + + var enumeratorTasks = sequenceResults.Select(sequenceResult => + { + var newQueryable = CreateAsyncExecuteQueryable(noPaginationQueryable, sequenceResult); return Task.Run(async () => { try { - var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount); - var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable); return new StreamMergeAsyncEnumerator(asyncEnumerator); } @@ -75,23 +75,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines }); }).ToArray(); - var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult(); + var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException(); return streamEnumerators; } - public override IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount) + private IQueryable CreateAsyncExecuteQueryable(IQueryable noPaginationQueryable, SequenceResult sequenceResult) { - var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); - DbContextQueryStore.TryAdd(routeResult, shardingDbContext); - var newQueryable = (IQueryable)(_multiRouteQuery ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable()) + var shardingDbContext = StreamMergeContext.CreateDbContext(sequenceResult.RouteResult); + DbContextQueryStore.TryAdd(sequenceResult.RouteResult, shardingDbContext); + var newQueryable = (IQueryable)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take)) .ReplaceDbContextQueryable(shardingDbContext); return newQueryable; } public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) { - if (_multiRouteQuery && StreamMergeContext.HasSkipTake()) - return new PaginationStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); if (StreamMergeContext.HasGroupQuery()) return new MultiAggregateOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); return new MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); diff --git a/src/ShardingCore/Sharding/Visitors/PropertyOrder.cs b/src/ShardingCore/Sharding/Visitors/PropertyOrder.cs index a4f7be64..cd6c0466 100644 --- a/src/ShardingCore/Sharding/Visitors/PropertyOrder.cs +++ b/src/ShardingCore/Sharding/Visitors/PropertyOrder.cs @@ -1,3 +1,5 @@ +using System; + namespace ShardingCore.Core.Internal.Visitors { /* diff --git a/src/ShardingCore/Sharding/Visitors/QuerySelectDiscoverVisitor.cs b/src/ShardingCore/Sharding/Visitors/QuerySelectDiscoverVisitor.cs index 98bbbbb5..c2049b77 100644 --- a/src/ShardingCore/Sharding/Visitors/QuerySelectDiscoverVisitor.cs +++ b/src/ShardingCore/Sharding/Visitors/QuerySelectDiscoverVisitor.cs @@ -48,11 +48,11 @@ namespace ShardingCore.Core.Internal.Visitors var method = methodCallExpression.Method; if (method.Name == nameof(Queryable.Count) || method.Name == nameof(Queryable.Sum) || method.Name == nameof(Queryable.Max) || method.Name == nameof(Queryable.Min) || method.Name == nameof(Queryable.Average)) { - _selectContext.SelectProperties.Add(new SelectProperty(node.Members[i].Name,true,method.Name)); + _selectContext.SelectProperties.Add(new SelectProperty(node.Members[i].DeclaringType, node.Members[i].Name,true,method.Name)); continue; } } - _selectContext.SelectProperties.Add(new SelectProperty(node.Members[i].Name,false,string.Empty)); + _selectContext.SelectProperties.Add(new SelectProperty(node.Members[i].DeclaringType, node.Members[i].Name,false,string.Empty)); } return base.VisitNew(node); } diff --git a/src/ShardingCore/Sharding/Visitors/RemoveOrderByDescendingVisitor.cs b/src/ShardingCore/Sharding/Visitors/RemoveOrderByDescendingVisitor.cs new file mode 100644 index 00000000..13ebf3a1 --- /dev/null +++ b/src/ShardingCore/Sharding/Visitors/RemoveOrderByDescendingVisitor.cs @@ -0,0 +1,26 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Text; + +namespace ShardingCore.Sharding.Visitors +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/3 15:05:27 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + internal class RemoveOrderByDescendingVisitor : ExpressionVisitor + { + protected override Expression VisitMethodCall(MethodCallExpression node) + { + if (node.Method.Name == nameof(Queryable.OrderByDescending)) + return base.Visit(node.Arguments[0]); + + return base.VisitMethodCall(node); + } + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Visitors/RemoveOrderByVisitor.cs b/src/ShardingCore/Sharding/Visitors/RemoveOrderByVisitor.cs new file mode 100644 index 00000000..dce3164f --- /dev/null +++ b/src/ShardingCore/Sharding/Visitors/RemoveOrderByVisitor.cs @@ -0,0 +1,26 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Text; + +namespace ShardingCore.Sharding.Visitors +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/3 15:04:25 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + internal class RemoveOrderByVisitor : ExpressionVisitor + { + protected override Expression VisitMethodCall(MethodCallExpression node) + { + if (node.Method.Name == nameof(Queryable.OrderBy)) + return base.Visit(node.Arguments[0]); + + return base.VisitMethodCall(node); + } + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Visitors/Selects/SelectProperty.cs b/src/ShardingCore/Sharding/Visitors/Selects/SelectProperty.cs index 2961a1ea..aeff5828 100644 --- a/src/ShardingCore/Sharding/Visitors/Selects/SelectProperty.cs +++ b/src/ShardingCore/Sharding/Visitors/Selects/SelectProperty.cs @@ -10,13 +10,14 @@ namespace ShardingCore.Core.Internal.Visitors.Selects */ public class SelectProperty { - public SelectProperty(string propertyName,bool isAggregateMethod,string aggregateMethod) + public SelectProperty( Type ownerType,string propertyName,bool isAggregateMethod,string aggregateMethod) { + OwnerType = ownerType; PropertyName = propertyName; IsAggregateMethod = isAggregateMethod; AggregateMethod = aggregateMethod; } - + public Type OwnerType { get; } public string PropertyName { get; } public bool IsAggregateMethod { get; } public string AggregateMethod { get; }