完成初步的分表分页组件

This commit is contained in:
xuejiaming 2021-09-03 16:00:12 +08:00
parent 7fa315cd0d
commit 0440a9e67d
37 changed files with 910 additions and 334 deletions

View File

@ -58,15 +58,15 @@ namespace Sample.SqlServer.Controllers
var sresultx11231 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age == 198198).Select(o => o.Id).Contains("1981");
var sresultx1121 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Sum(o => o.Age);
var sresultx111 = _defaultTableDbContext.Set<SysUserMod>().FirstOrDefault(o => o.Id == "198");
var sresultx2 = _defaultTableDbContext.Set<SysUserMod>().Count(o => o.Age <= 10);
var sresultx = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").FirstOrDefault();
var sresultx33 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o => o.Id).FirstOrDefault();
var sresultxc = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o => o.Id).ToList();
var sresultxasdc = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").ToList();
var sresult = _defaultTableDbContext.Set<SysUserMod>().ToList();
//var sresultx11231 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age == 198198).Select(o => o.Id).Contains("1981");
//var sresultx1121 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Sum(o => o.Age);
//var sresultx111 = _defaultTableDbContext.Set<SysUserMod>().FirstOrDefault(o => o.Id == "198");
//var sresultx2 = _defaultTableDbContext.Set<SysUserMod>().Count(o => o.Age <= 10);
//var sresultx = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").FirstOrDefault();
//var sresultx33 = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o => o.Id).FirstOrDefault();
//var sresultxc = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o => o.Id).ToList();
//var sresultxasdc = _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").ToList();
//var sresult = _defaultTableDbContext.Set<SysUserMod>().ToList();
var sysUserMod98 = result.FirstOrDefault(o => o.Id == "98");
_defaultTableDbContext.Attach(sysUserMod98);
@ -86,5 +86,11 @@ namespace Sample.SqlServer.Controllers
}
return Ok();
}
[HttpGet]
public async Task<IActionResult> Get1([FromQuery] int p,[FromQuery]int s)
{
var shardingPageResultAsync = await _defaultTableDbContext.Set<SysUserSalary>().OrderByDescending(o=>o.DateOfMonth).ToShardingPageAsync(p, s);
return Ok(shardingPageResultAsync);
}
}
}

View File

@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.AspNetCore.Builder;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Sample.SqlServer.DbContexts;
using Sample.SqlServer.Domain.Entities;
@ -25,14 +27,17 @@ namespace Sample.SqlServer
public static void DbSeed(this IApplicationBuilder app)
{
using (var scope=app.ApplicationServices.CreateScope())
using (var scope = app.ApplicationServices.CreateScope())
{
var virtualDbContext =scope.ServiceProvider.GetService<DefaultShardingDbContext>();
var virtualDbContext = scope.ServiceProvider.GetService<DefaultShardingDbContext>();
if (!virtualDbContext.Set<SysUserMod>().Any())
{
var ids = Enumerable.Range(1, 1000);
var userMods = new List<SysUserMod>();
var userSalaries = new List<SysUserSalary>();
var SysTests = new List<SysTest>();
var beginTime = new DateTime(2020, 1, 1);
var endTime = new DateTime(2021, 12, 1);
foreach (var id in ids)
{
userMods.Add(new SysUserMod()
@ -40,16 +45,38 @@ namespace Sample.SqlServer
Id = id.ToString(),
Age = id,
Name = $"name_{id}",
AgeGroup = Math.Abs(id % 10)
});
SysTests.Add(new SysTest()
{
Id = id.ToString(),
UserId = id.ToString()
});
var tempTime = beginTime;
var i = 0;
while (tempTime <= endTime)
{
var dateOfMonth = $@"{tempTime:yyyyMM}";
userSalaries.Add(new SysUserSalary()
{
Id = $@"{id}{dateOfMonth}",
UserId = id.ToString(),
DateOfMonth = int.Parse(dateOfMonth),
Salary = 700000 + id * 100 * i,
SalaryLong = 700000 + id * 100 * i,
SalaryDecimal = (700000 + id * 100 * i) / 100m,
SalaryDouble = (700000 + id * 100 * i) / 100d,
SalaryFloat = (700000 + id * 100 * i) / 100f
});
tempTime = tempTime.AddMonths(1);
i++;
}
}
virtualDbContext.AddRange(userMods);
virtualDbContext.AddRange(SysTests);
virtualDbContext.AddRange(userSalaries);
virtualDbContext.SaveChanges();
}
}

View File

@ -19,6 +19,7 @@ namespace Sample.SqlServer.DbContexts
base.OnModelCreating(modelBuilder);
modelBuilder.ApplyConfiguration(new SysUserModMap());
modelBuilder.ApplyConfiguration(new SysTestMap());
modelBuilder.ApplyConfiguration(new SysUserSalaryMap());
}
public override Type ShardingDbContextType => this.GetType();

View File

@ -17,6 +17,7 @@ namespace Sample.SqlServer.DbContexts
base.OnModelCreating(modelBuilder);
modelBuilder.ApplyConfiguration(new SysUserModMap());
modelBuilder.ApplyConfiguration(new SysTestMap());
modelBuilder.ApplyConfiguration(new SysUserSalaryMap());
}
public IRouteTail RouteTail { get; set; }

View File

@ -27,5 +27,6 @@ namespace Sample.SqlServer.Domain.Entities
/// 用户姓名
/// </summary>
public int Age { get; set; }
public int AgeGroup { get; set; }
}
}

View File

@ -0,0 +1,42 @@
using ShardingCore.Core;
namespace Sample.SqlServer.Domain.Entities
{
/*
* @Author: xjm
* @Description:
* @Date: Monday, 01 February 2021 15:43:22
* @Email: 326308290@qq.com
*/
public class SysUserSalary:IShardingTable
{
public string Id { get; set; }
public string UserId { get; set; }
/// <summary>
/// 每月的金额
/// </summary>
[ShardingTableKey]
public int DateOfMonth { get; set; }
/// <summary>
/// 工资
/// </summary>
public int Salary { get; set; }
/// <summary>
/// 工资
/// </summary>
public long SalaryLong { get; set; }
/// <summary>
/// 工资
/// </summary>
public decimal SalaryDecimal { get; set; }
/// <summary>
/// 工资
/// </summary>
public double SalaryDouble { get; set; }
/// <summary>
/// 工资
/// </summary>
public float SalaryFloat { get; set; }
}
}

View File

@ -0,0 +1,23 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using Sample.SqlServer.Domain.Entities;
namespace Sample.SqlServer.Domain.Maps
{
/*
* @Author: xjm
* @Description:
* @Date: Monday, 01 February 2021 15:42:35
* @Email: 326308290@qq.com
*/
public class SysUserSalaryMap:IEntityTypeConfiguration<SysUserSalary>
{
public void Configure(EntityTypeBuilder<SysUserSalary> builder)
{
builder.HasKey(o => o.Id);
builder.Property(o => o.Id).IsRequired().HasMaxLength(128);
builder.Property(o => o.UserId).IsRequired().HasMaxLength(128);
builder.ToTable(nameof(SysUserSalary));
}
}
}

View File

@ -12,7 +12,7 @@ namespace Sample.SqlServer.Shardings
public void Configure(PaginationBuilder<SysUserMod> builder)
{
builder.PaginationSequence(o => o.Id)
.UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.FirstMatch);
.UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch);
}
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Sample.SqlServer.Domain.Entities;
using ShardingCore.Sharding.PaginationConfigurations;
namespace Sample.SqlServer.Shardings
{
public class SysUserSalaryPaginationConfiguration:IPaginationConfiguration<SysUserSalary>
{
public void Configure(PaginationBuilder<SysUserSalary> builder)
{
//builder.PaginationSequence(o => o.Id)
// .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch);
//builder.PaginationSequence(o => o.DateOfMonth)
// .UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.PrimaryMatch).UseAppendIfOrderNone();
builder.ConfigReverseShardingPage();
}
}
}

View File

@ -0,0 +1,80 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using Sample.SqlServer.Domain.Entities;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes.Abstractions;
using ShardingCore.Sharding.PaginationConfigurations;
namespace Sample.SqlServer.Shardings
{
/*
* @Author: xjm
* @Description:
* @Date: Monday, 01 February 2021 15:54:55
* @Email: 326308290@qq.com
*/
public class SysUserSalaryVirtualTableRoute:AbstractShardingOperatorVirtualTableRoute<SysUserSalary,int>
{
protected override int ConvertToShardingKey(object shardingKey)
{
return Convert.ToInt32(shardingKey);
}
public override string ShardingKeyToTail(object shardingKey)
{
var time = ConvertToShardingKey(shardingKey);
return TimeFormatToTail(time);
}
public override List<string> GetAllTails()
{
var beginTime = new DateTime(2020, 1, 1);
var endTime = new DateTime(2021, 12, 1);
var list = new List<string>(24);
var tempTime = beginTime;
while (tempTime <= endTime)
{
list.Add($"{tempTime:yyyyMM}");
tempTime = tempTime.AddMonths(1);
}
return list;
}
protected string TimeFormatToTail(int time)
{
var dateOfMonth=DateTime.ParseExact($"{time}","yyyyMM",System.Globalization.CultureInfo.InvariantCulture,System.Globalization.DateTimeStyles.AdjustToUniversal);
return $"{dateOfMonth:yyyyMM}";
}
protected override Expression<Func<string, bool>> GetRouteToFilter(int shardingKey, ShardingOperatorEnum shardingOperator)
{
var t = TimeFormatToTail(shardingKey);
switch (shardingOperator)
{
case ShardingOperatorEnum.GreaterThan:
case ShardingOperatorEnum.GreaterThanOrEqual:
return tail => String.Compare(tail, t, StringComparison.Ordinal) >= 0;
case ShardingOperatorEnum.LessThan:
return tail => String.Compare(tail, t, StringComparison.Ordinal) < 0;
case ShardingOperatorEnum.LessThanOrEqual:
return tail => String.Compare(tail, t, StringComparison.Ordinal) <= 0;
case ShardingOperatorEnum.Equal: return tail => tail == t;
default:
{
#if DEBUG
Console.WriteLine($"shardingOperator is not equal scan all table tail");
#endif
return tail => true;
}
}
}
public override IPaginationConfiguration<SysUserSalary> CreatePaginationConfiguration()
{
return new SysUserSalaryPaginationConfiguration();
}
}
}

View File

@ -38,6 +38,7 @@ namespace Sample.SqlServer
//.ReplaceService<IQueryTranslationPostprocessorFactory,SqlServer2008QueryTranslationPostprocessorFactory>()//支持sqlserver2008r2
);//使用链接字符串创建dbcontext
op.AddShardingTableRoute<SysUserModVirtualTableRoute>();
op.AddShardingTableRoute<SysUserSalaryVirtualTableRoute>();
});
////不支持MARS不支持追踪的

View File

@ -1,6 +1,7 @@
using System.Linq;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Sharding.Visitors;
namespace ShardingCore.Extensions
{
@ -29,7 +30,8 @@ namespace ShardingCore.Extensions
/// <returns></returns>
internal static IQueryable<T> RemoveSkip<T>(this IQueryable<T> source)
{
return (IQueryable<T>)source.Provider.CreateQuery(new RemoveSkipVisitor().Visit(source.Expression));
var expression = new RemoveSkipVisitor().Visit(source.Expression);
return (IQueryable<T>)source.Provider.CreateQuery(expression);
}
/// <summary>
@ -43,6 +45,17 @@ namespace ShardingCore.Extensions
var expression = new RemoveTakeVisitor().Visit(source.Expression);
return (IQueryable<T>) source.Provider.CreateQuery(expression);
}
internal static IQueryable<T> RemoveOrderBy<T>(this IQueryable<T> source)
{
var expression = new RemoveOrderByVisitor().Visit(source.Expression);
return (IQueryable<T>) source.Provider.CreateQuery(expression);
}
internal static IQueryable<T> RemoveOrderByDescending<T>(this IQueryable<T> source)
{
var expression = new RemoveOrderByDescendingVisitor().Visit(source.Expression);
return (IQueryable<T>) source.Provider.CreateQuery(expression);
}
/// <summary>
/// 切换数据源,保留原数据源中的Expression

View File

@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
namespace ShardingCore.Extensions.InternalExtensions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/3 10:13:07
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal static class InternalLinqExtension
{
public static IEnumerable<TShource> OrderByIf<TShource, TKey>(this IEnumerable<TShource> source, Func<TShource, TKey> keySelector, bool condition,
IComparer<TKey>? comparer)
{
return condition ? source.OrderBy(keySelector, comparer) : source;
}
public static IEnumerable<TShource> OrderByDescendingIf<TShource, TKey>(this IEnumerable<TShource> source, Func<TShource, TKey> keySelector, bool condition,
IComparer<TKey>? comparer)
{
return condition ? source.OrderByDescending(keySelector, comparer) : source;
}
}
}

View File

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Text;
using ShardingCore.Sharding.PaginationConfigurations;
namespace ShardingCore.Extensions.InternalExtensions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/3 13:21:09
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal static class InternalPaginationMetadataExtension
{
internal static bool IsUseReverse(this PaginationMetadata paginationMetadata,int skip,long total)
{
if (total < paginationMetadata.ReverseTotalGe)
return false;
return paginationMetadata.ReverseFactor * total < skip;
}
}
}

View File

@ -72,6 +72,17 @@ namespace ShardingCore.Extensions
return null;
}
}
/// <summary>
/// 类型X是否包含某个属性
/// </summary>
/// <param name="type"></param>
/// <param name="propertyName"></param>
/// <returns></returns>
public static bool ContainPropertyName(this Type type, string propertyName)
{
var property = type.GetProperty(propertyName, _bindingFlags);
return property != null;
}
}
}

View File

@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.ShardingPage.Abstractions;
namespace ShardingCore.Extensions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/3 10:36:51
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public static class ShardingQueryableExtension
{
public static async Task<ShardingPagedResult<T>> ToShardingPageAsync<T>(this IQueryable<T> source, int pageIndex, int pageSize)
{
//设置每次获取多少页
var take = pageSize <= 0 ? 1 : pageSize;
//设置当前页码最小1
var index = pageIndex <= 0 ? 1 : pageIndex;
//需要跳过多少页
var skip = (index - 1) * take;
var shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
using (shardingPageManager.CreateScope())
{
//获取每次总记录数
var count = await source.LongCountAsync();
if (count <= skip)
return new ShardingPagedResult<T>(new List<T>(0), count);
var data = await source.Skip(skip).Take(take).ToListAsync();
return new ShardingPagedResult<T>(data, count);
}
}
public static ShardingPagedResult<T> ToShardingPage<T>(this IQueryable<T> source, int pageIndex, int pageSize)
{
//设置每次获取多少页
var take = pageSize <= 0 ? 1 : pageSize;
//设置当前页码最小1
var index = pageIndex <= 0 ? 1 : pageIndex;
//需要跳过多少页
var skip = (index - 1) * take;
var shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
using (shardingPageManager.CreateScope())
{
//获取每次总记录数
var count = source.Count();
if (count <= skip)
return new ShardingPagedResult<T>(new List<T>(0), count);
var data = source.Skip(skip).Take(take).ToList();
return new ShardingPagedResult<T>(data, count);
}
}
}
}

View File

@ -134,125 +134,4 @@ namespace ShardingCore.Extensions
}
}
}
}
static class TaskExtension
{
/// <summary>
/// 是否成功
/// </summary>
/// <param name="task"></param>
/// <returns></returns>
public static bool IsCompletedSuccessfully(this Task task)
{
return task.IsCompleted && !(task.IsCanceled || task.IsFaulted);
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
public static void WaitAndUnwrapException(this Task task)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
task.GetAwaiter().GetResult();
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <param name="cancellationToken">A cancellation token to observe while waiting for the task to complete.</param>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was cancelled before the <paramref name="task"/> completed, or the <paramref name="task"/> raised an <see cref="OperationCanceledException"/>.</exception>
public static void WaitAndUnwrapException(this Task task, CancellationToken cancellationToken)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait(cancellationToken);
}
catch (AggregateException ex)
{
ExceptionDispatchInfo.Capture(ex).Throw();
throw ex;
}
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <typeparam name="TResult">The type of the result of the task.</typeparam>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <returns>The result of the task.</returns>
public static TResult WaitAndUnwrapException<TResult>(this Task<TResult> task)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
return task.GetAwaiter().GetResult();
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <typeparam name="TResult">The type of the result of the task.</typeparam>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <param name="cancellationToken">A cancellation token to observe while waiting for the task to complete.</param>
/// <returns>The result of the task.</returns>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was cancelled before the <paramref name="task"/> completed, or the <paramref name="task"/> raised an <see cref="OperationCanceledException"/>.</exception>
public static TResult WaitAndUnwrapException<TResult>(this Task<TResult> task, CancellationToken cancellationToken)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait(cancellationToken);
return task.Result;
}
catch (AggregateException ex)
{
ExceptionDispatchInfo.Capture(ex).Throw();
throw ex;
}
}
/// <summary>
/// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
public static void WaitWithoutException(this Task task)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait();
}
catch (AggregateException)
{
}
}
/// <summary>
/// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <param name="cancellationToken">A cancellation token to observe while waiting for the task to complete.</param>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was cancelled before the <paramref name="task"/> completed.</exception>
public static void WaitWithoutException(this Task task, CancellationToken cancellationToken)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait(cancellationToken);
}
catch (AggregateException)
{
cancellationToken.ThrowIfCancellationRequested();
}
}
}
}
}

View File

@ -0,0 +1,61 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/3 15:30:32
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class InMemoryReverseStreamMergeAsyncEnumerator<T>:IStreamMergeAsyncEnumerator<T>
{
private readonly IStreamMergeAsyncEnumerator<T> _inMemoryStreamMergeAsyncEnumerator;
private bool _first = true;
private IEnumerator<T> _reverseEnumerator = Enumerable.Empty<T>().GetEnumerator();
public InMemoryReverseStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<T> inMemoryStreamMergeAsyncEnumerator)
{
_inMemoryStreamMergeAsyncEnumerator = inMemoryStreamMergeAsyncEnumerator;
}
public async ValueTask DisposeAsync()
{
await _inMemoryStreamMergeAsyncEnumerator.DisposeAsync();
_reverseEnumerator.Dispose();
}
public async ValueTask<bool> MoveNextAsync()
{
if (_first)
{
ICollection<T> _reverseCollection = new LinkedList<T>();
while(await _inMemoryStreamMergeAsyncEnumerator.MoveNextAsync())
{
_reverseCollection.Add(_inMemoryStreamMergeAsyncEnumerator.Current);
}
_reverseEnumerator = _reverseCollection.Reverse().GetEnumerator();
_first = false;
}
return _reverseEnumerator.MoveNext();
}
public T Current => _reverseEnumerator.Current;
public bool SkipFirst()
{
throw new NotImplementedException();
}
public bool HasElement()
{
throw new NotImplementedException();
}
public T ReallyCurrent => Current;
}
}

View File

@ -32,13 +32,15 @@ namespace ShardingCore.Sharding.PaginationConfigurations
return new PaginationOrderPropertyBuilder(orderPropertyExpression, _metadata);
}
/// <summary>
/// 配置当跳过多少条后开始启用只能分页
/// 配置反向排序 仅支持单排序 当skip>= reverseTotalGe*reverseFactor使用反向排序
/// </summary>
/// <param name="skip"></param>
/// <param name="reverseFactor"></param>
/// <param name="reverseTotalGe"></param>
/// <returns></returns>
public PaginationBuilder<TEntity> ConfigUseShardingPageIfGeSkip(long skip)
public PaginationBuilder<TEntity> ConfigReverseShardingPage(double reverseFactor=0.5,long reverseTotalGe=10000L)
{
_metadata.UseShardingPageIfGeSkipAvg = skip;
_metadata.ReverseFactor = reverseFactor;
_metadata.ReverseTotalGe = reverseTotalGe;
return this;
}
/// <summary>

View File

@ -30,10 +30,15 @@ namespace ShardingCore.Sharding.PaginationConfigurations
public IComparer<string> TailComparer { get; set; }
public PaginationMatchEnum PaginationMatchEnum { get; set; }
public PropertyInfo OrderPropertyInfo { get; set; }
/// <summary>
/// 如果查询没发现排序就将当前配置追加上去
/// </summary>
public bool AppendIfOrderNone { get; set; }
public bool AppendIfOrderNone => AppendOrder >= 0;
/// <summary>
/// 大于等于0表示需要
/// </summary>
public int AppendOrder { get; set; } = -1;
public string PropertyName { get;}

View File

@ -22,6 +22,6 @@ namespace ShardingCore.Sharding.PaginationConfigurations
/// <summary>
/// 仅第一个匹配就可以了
/// </summary>
FirstMatch = 1 << 2
PrimaryMatch = 1 << 2
}
}

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ShardingCore.Sharding.PaginationConfigurations
@ -17,10 +18,20 @@ namespace ShardingCore.Sharding.PaginationConfigurations
public class PaginationMetadata
{
public ISet<PaginationConfig> PaginationConfigs = new HashSet<PaginationConfig>();
/// <summary>
/// 配置生效当跳过多少条后 GREATER THAN OR EQUAL
/// 反向排序因子
/// </summary>
public long UseShardingPageIfGeSkipAvg { get; set; } = 3000L;
public double ReverseFactor { get; set; } = -1;
/// <summary>
/// 当条数大于多少条后采用反向排序
/// </summary>
public long ReverseTotalGe { get; set; } = 10000L;
/// <summary>
/// 是否已开启反向排序 仅支持单排序
/// </summary>
public bool EnableReverseShardingPage => ReverseFactor > 0 && ReverseFactor < 1 && ReverseTotalGe >= 1000;
/// <summary>
/// 分表发现如果少于多少条后直接取到内存 LESS THAN OR EQUAL
/// </summary>

View File

@ -44,10 +44,11 @@ namespace ShardingCore.Sharding.PaginationConfigurations
/// <summary>
/// 如果查询没发现排序就将当前配置追加上去
/// </summary>
/// <param name="order">大于等于0生效,越大优先级越高</param>
/// <returns></returns>
public PaginationOrderPropertyBuilder UseAppendIfOrderNone()
public PaginationOrderPropertyBuilder UseAppendIfOrderNone(int order=0)
{
_paginationConfig.AppendIfOrderNone = true;
_paginationConfig.AppendOrder = order;
return this;
}
}

View File

@ -1,36 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ShardingQueryExecutors
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/30 17:11:40
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class AbstractShardingQueryExecutor<TEntity>: IShardingQueryExecutor<TEntity>
{
private readonly MethodCallExpression _expression;
private readonly IShardingDbContext _shardingDbContext;
public AbstractShardingQueryExecutor(MethodCallExpression expression,IShardingDbContext shardingDbContext)
{
_expression = expression;
_shardingDbContext = shardingDbContext;
}
public MethodCallExpression GetQueryExpression()
{
return _expression;
}
public IShardingDbContext GetCurrentShardingDbContext()
{
return _shardingDbContext;
}
}
}

View File

@ -4,9 +4,11 @@ using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines;
@ -41,33 +43,76 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
return new SingleQueryEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext);
}
//未开启系统分表或者本次查询涉及多张分表
if (!_streamMergeContext.IsPaginationQuery()||!_streamMergeContext.IsSingleShardingTableQuery()||_shardingPageManager.Current == null)
if (_streamMergeContext.IsPaginationQuery()&&_streamMergeContext.IsSingleShardingTableQuery()&&_shardingPageManager.Current != null)
{
return new DefaultShardingEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext);
}
var shardingEntityType = _streamMergeContext.RouteResults.First().ReplaceTables.Single(o=>o.IsShardingTable()).EntityType;
var virtualTable = _virtualTableManager.GetVirtualTable(_streamMergeContext.GetShardingDbContext().ShardingDbContextType,shardingEntityType);
if (!virtualTable.EnablePagination)
return new DefaultShardingEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext);
if (_streamMergeContext.Orders.IsEmpty())
{
var append = virtualTable.PaginationMetadata.PaginationConfigs.FirstOrDefault(o=>o.AppendIfOrderNone);
if (append != null)
//获取虚拟表判断是否启用了分页配置
var shardingEntityType = _streamMergeContext.RouteResults.First().ReplaceTables.First().EntityType;
var virtualTable = _virtualTableManager.GetVirtualTable(_streamMergeContext.GetShardingDbContext().ShardingDbContextType, shardingEntityType);
if (virtualTable.EnablePagination)
{
123
return new SequenceEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext);
var paginationMetadata = virtualTable.PaginationMetadata;
//判断本次查询的排序是否包含order如果不包含就获取默认添加的排序
if (_streamMergeContext.Orders.IsEmpty())
{
//除了判断属性名还要判断所属关系
var appendPaginationConfig = paginationMetadata.PaginationConfigs.OrderByDescending(o=>o.AppendOrder)
.FirstOrDefault(o => o.AppendIfOrderNone&&typeof(TEntity).ContainPropertyName(o.PropertyName)&& PaginationMatch(o));
if (appendPaginationConfig != null)
{
return new AppenOrderSequenceEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext, appendPaginationConfig, _shardingPageManager.Current.RouteQueryResults);
}
}
else
{
var primaryOrder = _streamMergeContext.Orders.First();
var appendPaginationConfig = paginationMetadata.PaginationConfigs.FirstOrDefault(o => PaginationMatch(o,primaryOrder));
if (appendPaginationConfig != null)
{
return new SequenceEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext, appendPaginationConfig, _shardingPageManager.Current.RouteQueryResults, primaryOrder.IsAsc);
}
if (_streamMergeContext.Orders.Count() == 1)
{
//skip过大reserve skip
if (paginationMetadata.EnableReverseShardingPage &&_streamMergeContext.Take.GetValueOrDefault()>0)
{
var total = _shardingPageManager.Current.RouteQueryResults.Sum(o => o.QueryResult);
if (paginationMetadata.IsUseReverse(_streamMergeContext.Skip.GetValueOrDefault(), total))
{
return new ReverseShardingEnumeratorAsyncStreamMergeEngine<TEntity>(
_streamMergeContext, primaryOrder, total);
}
}
}
}
}
}
var propertyOrder = _streamMergeContext.Orders.First();
//PaginationMatchEnum.Owner
111
var paginationConfig = virtualTable.PaginationMetadata.PaginationConfigs.FirstOrDefault(o=>o.PropertyName==propertyOrder.PropertyExpression);
if (paginationConfig==null)
return new DefaultShardingEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext);
//调用顺序排序
paginationConfig
return new DefaultShardingEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext);
}
private bool PaginationMatch(PaginationConfig paginationConfig)
{
if (paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Owner)&& !paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Named))
return typeof(TEntity) == paginationConfig.OrderPropertyInfo.DeclaringType;
return false;
}
private bool PaginationMatch(PaginationConfig paginationConfig,PropertyOrder propertyOrder)
{
if (paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.PrimaryMatch))
{
if (!propertyOrder.PropertyExpression.StartsWith(paginationConfig.PropertyName))
return false;
}
if (propertyOrder.PropertyExpression != paginationConfig.PropertyName)
return false;
if (paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Owner)&&!paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Named))
return typeof(TEntity) == paginationConfig.OrderPropertyInfo.DeclaringType;
return false;
}
}

View File

@ -31,7 +31,7 @@ namespace ShardingCore.Sharding
//public DataSourceRoutingResult RoutingResult { get; }
public int? Skip { get;}
public int? Take { get; }
public IEnumerable<PropertyOrder> Orders { get;}
public IEnumerable<PropertyOrder> Orders { get; private set; }
public SelectContext SelectContext { get;}
public GroupByContext GroupByContext { get; }
@ -68,7 +68,10 @@ namespace ShardingCore.Sharding
// GroupByContext = reWriteResult.GroupByContext;
// _reWriteSource = reWriteResult.ReWriteQueryable;
//}
public void ReSetOrders(IEnumerable<PropertyOrder> orders)
{
Orders = orders;
}
public DbContext CreateDbContext(RouteResult routeResult)
{
var routeTail = _routeTailFactory.Create(routeResult);

View File

@ -106,7 +106,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
public List<TResult> Execute<TResult>(Func<IQueryable, TResult> efQuery, CancellationToken cancellationToken = new CancellationToken())
{
var tableResult = _mergeContext.GetRouteResults();
var tableResult = _mergeContext.RouteResults;
var enumeratorTasks = tableResult.Select(routeResult =>
{
return Task.Run(() =>

View File

@ -11,6 +11,7 @@ using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.Enumerators.StreamMergeSync;
using ShardingCore.Sharding.ShardingQueryExecutors;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
@ -24,15 +25,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Date: Saturday, 14 August 2021 22:07:28
* @Email: 326308290@qq.com
*/
public class AsyncEnumerableStreamMergeEngine<T> : IAsyncEnumerable<T>, IEnumerable<T>, IDisposable
public class AsyncEnumerableStreamMergeEngine<T> : IAsyncEnumerable<T>, IEnumerable<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly ICollection<DbContext> _parllelDbbContexts;
public AsyncEnumerableStreamMergeEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
_parllelDbbContexts = new LinkedList<DbContext>();
}
@ -46,7 +45,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines
}
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
return GetShardingEnumerator();
return new EnumeratorShardingQueryExecutor<T>(_mergeContext).ExecuteAsync(cancellationToken)
.GetAsyncEnumerator(cancellationToken);
}
#endif
@ -62,47 +62,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return GetShardingEnumerator();
}
#endif
private IQueryable<T> CreateAsyncExecuteQueryable(RouteResult routeResult,int routeCount)
{
var shardingDbContext = _mergeContext.CreateDbContext(routeResult);
var useOriginal = routeCount>1;
_parllelDbbContexts.Add(shardingDbContext);
var newQueryable = (IQueryable<T>)(useOriginal?_mergeContext.GetReWriteQueryable():_mergeContext.GetOriginalQueryable())
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
}
private IAsyncEnumerator<T> GetShardingEnumerator()
{
var tableResult = _mergeContext.RouteResults;
var routeCount = tableResult.Count();
var enumeratorTasks = tableResult.Select(routeResult =>
{
return Task.Run(async () =>
{
try
{
var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount);
var asyncEnumerator = await GetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<T>(asyncEnumerator);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
if (routeCount>1&&_mergeContext.HasSkipTake())
return new PaginationStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
if (_mergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
}
private IEnumerator<T> GetEnumerator(IQueryable<T> newQueryable)
@ -114,33 +74,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public IEnumerator<T> GetEnumerator()
{
var tableResult = _mergeContext.RouteResults;
var routeCount = tableResult.Count();
var enumeratorTasks = tableResult.Select(routeResult =>
{
return Task.Run(() =>
{
try
{
var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount);
var enumerator = GetEnumerator(newQueryable);
return new StreamMergeEnumerator<T>(enumerator);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
if (routeCount > 1 && _mergeContext.HasSkipTake())
return new PaginationStreamMergeEnumerator<T>(_mergeContext, streamEnumerators);
if (_mergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeEnumerator<T>(_mergeContext, streamEnumerators);
return new MultiOrderStreamMergeEnumerator<T>(_mergeContext, streamEnumerators);
return new EnumeratorShardingQueryExecutor<T>(_mergeContext).ExecuteAsync()
.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
@ -148,22 +84,5 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return GetEnumerator();
}
public void Dispose()
{
if (_parllelDbbContexts.IsNotEmpty())
{
_parllelDbbContexts.ForEach(o =>
{
try
{
o.Dispose();
}
catch (Exception e)
{
Console.WriteLine(e);
}
});
}
}
}
}

View File

@ -0,0 +1,100 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/3 8:09:18
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class AppenOrderSequenceEnumeratorAsyncStreamMergeEngine<TEntity> : AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
{
private readonly PaginationConfig _appendPaginationConfig;
private readonly ICollection<RouteQueryResult<long>> _routeQueryResults;
private IShardingPageManager _shardingPageManager;
private IVirtualTableManager _virtualTableManager;
public AppenOrderSequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, PaginationConfig appendPaginationConfig, ICollection<RouteQueryResult<long>> routeQueryResults) : base(streamMergeContext)
{
_appendPaginationConfig = appendPaginationConfig;
_routeQueryResults = routeQueryResults;
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager>();
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
{
var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake();
var skip = StreamMergeContext.Skip.GetValueOrDefault();
if (skip < 0)
throw new ShardingCoreException("skip must ge 0");
var take = StreamMergeContext.Take;
if (take.HasValue&&take.Value <= 0)
throw new ShardingCoreException("take must gt 0");
var sortRouteResults = _routeQueryResults.Select(o => new
{
Tail = o.RouteResult.ReplaceTables.First().Tail,
RouteQueryResult = o
}).OrderBy(o => o.Tail, _appendPaginationConfig.TailComparer).ToList();
var skipCount = skip;
var sequenceResults = new SequencePaginationList(sortRouteResults.Select(o=>o.RouteQueryResult)).Skip(skip).Take(take).ToList();
StreamMergeContext.ReSetOrders(new PropertyOrder[] { new PropertyOrder(_appendPaginationConfig.PropertyName, true) });
var enumeratorTasks = sequenceResults.Select(sequenceResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(noPaginationQueryable, sequenceResult);
return Task.Run(async () =>
{
try
{
var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();
return streamEnumerators;
}
private IQueryable<TEntity> CreateAsyncExecuteQueryable(IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(sequenceResult.RouteResult);
DbContextQueryStore.TryAdd(sequenceResult.RouteResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take).OrderWithExpression(new PropertyOrder[]{new PropertyOrder(_appendPaginationConfig.PropertyName,true)}))
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,101 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/3 8:31:20
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class SequencePaginationList
{
private readonly IEnumerable<RouteQueryResult<long>> _routeQueryResults;
private long? _skip;
private long? _take;
public SequencePaginationList(IEnumerable<RouteQueryResult<long>> routeQueryResults)
{
_routeQueryResults = routeQueryResults;
}
public SequencePaginationList Skip(long? skip)
{
_skip = skip;
return this;
}
public SequencePaginationList Take(long? take)
{
_take = take;
return this;
}
public ICollection<SequenceResult> ToList()
{
ICollection<SequenceResult> routeResults = new LinkedList<SequenceResult>();
var currentSkip = _skip.GetValueOrDefault();
var currentTake = _take;
bool stopSkip = false;
bool needBreak = false;
foreach (var routeQueryResult in _routeQueryResults)
{
if (!stopSkip)
{
if (routeQueryResult.QueryResult >= currentSkip)
{
stopSkip = true;
}
else
{
currentSkip = currentSkip - routeQueryResult.QueryResult;
continue;
}
}
var currentRealSkip = currentSkip;
var currentRealTake = routeQueryResult.QueryResult-currentSkip;
if (currentSkip != 0)
currentSkip = 0;
if (currentTake.HasValue)
{
if (currentTake.Value <= currentRealTake)
{
currentRealTake = currentTake.Value;
needBreak = true;
}
else
{
currentRealTake = currentTake.Value-currentRealTake;
}
}
var sequenceResult = new SequenceResult(currentRealSkip, currentRealTake, routeQueryResult.RouteResult);
routeResults.Add(sequenceResult);
if (needBreak)
break;
}
return routeResults;
}
}
public class SequenceResult
{
public SequenceResult(long skip, long take, RouteResult routeResult)
{
Skip = (int)skip;
Take = (int)take;
RouteResult = routeResult;
}
public int Skip { get; }
public int Take { get; }
public RouteResult RouteResult { get; }
}
}

View File

@ -0,0 +1,91 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/3 13:32:12
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ReverseShardingEnumeratorAsyncStreamMergeEngine<TEntity> : AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
{
private readonly PropertyOrder _primaryOrder;
private readonly long _total;
public ReverseShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, PropertyOrder primaryOrder, long total) : base(streamMergeContext)
{
_primaryOrder = primaryOrder;
_total = total;
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
{
var noPaginationNoOrderQueryable = _primaryOrder.IsAsc ? StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveOrderBy(): StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveOrderByDescending();
var skip = StreamMergeContext.Skip.GetValueOrDefault();
var take = StreamMergeContext.Take.GetValueOrDefault();
var realSkip = _total- take- skip;
var tableResult = StreamMergeContext.RouteResults;
var reverseOrderQueryable = noPaginationNoOrderQueryable.Skip((int)realSkip).Take((int)realSkip+take).OrderWithExpression(new List<PropertyOrder>()
{
new PropertyOrder( _primaryOrder.PropertyExpression,!_primaryOrder.IsAsc)
});
var enumeratorTasks = tableResult.Select(routeResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(reverseOrderQueryable,routeResult);
return Task.Run(async () =>
{
try
{
var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();
return streamEnumerators;
}
private IQueryable<TEntity> CreateAsyncExecuteQueryable(IQueryable<TEntity> reverseOrderQueryable, RouteResult routeResult)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
DbContextQueryStore.TryAdd(routeResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>)reverseOrderQueryable
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
var doGetStreamMergeAsyncEnumerator = DoGetStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
return new InMemoryReverseStreamMergeAsyncEnumerator<TEntity>(doGetStreamMergeAsyncEnumerator);
}
private IStreamMergeAsyncEnumerator<TEntity> DoGetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
}
}
}

View File

@ -3,14 +3,18 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
{
@ -23,47 +27,43 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
*/
public class SequenceEnumeratorAsyncStreamMergeEngine<TEntity> : AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
{
private IShardingPageManager _shardingPageManager;
private IVirtualTableManager _virtualTableManager;
public SequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
private readonly PaginationConfig _orderPaginationConfig;
private readonly ICollection<RouteQueryResult<long>> _routeQueryResults;
private readonly bool _isAsc;
public SequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, PaginationConfig orderPaginationConfig, ICollection<RouteQueryResult<long>> routeQueryResults, bool isAsc) : base(streamMergeContext)
{
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager>();
_orderPaginationConfig = orderPaginationConfig;
_routeQueryResults = routeQueryResults;
_isAsc = isAsc;
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
{
var routeQueryResults = _shardingPageManager.Current.RouteQueryResults.ToList();
if (routeQueryResults.Any(o => o.RouteResult.ReplaceTables.Count(p=>p.IsShardingTable())!=1)
|| routeQueryResults.HasDifference(o=>o.RouteResult.ReplaceTables.First().EntityType))
throw new InvalidOperationException($"error sharding page:[{StreamMergeContext.GetOriginalQueryable().Expression.ShardingPrint()}]");
var shardingEntityType = routeQueryResults[0].RouteResult.ReplaceTables.FirstOrDefault(o=>o.IsShardingTable()).EntityType;
var virtualTable = _virtualTableManager.GetVirtualTable(StreamMergeContext.GetShardingDbContext().ShardingDbContextType, shardingEntityType);
if (!virtualTable.EnablePagination)
{
throw new ShardingCoreException("not support Sequence enumerator");
}
var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake();
var skip = StreamMergeContext.Skip.GetValueOrDefault();
if (skip < 0)
throw new ShardingCoreException("skip must ge 0");
if (base.StreamMergeContext.Orders.IsEmpty())
{
var append = virtualTable.PaginationMetadata.PaginationConfigs.FirstOrDefault(o=>o.AppendIfOrderNone);
if (append != null)
{
StreamMergeContext.GetOriginalQueryable().OrderBy("")
}
}
var take = StreamMergeContext.Take;
if (take.HasValue && take.Value <= 0)
throw new ShardingCoreException("take must gt 0");
var tableResult = StreamMergeContext.RouteResults;
var routeCount = tableResult.Count();
var enumeratorTasks = tableResult.Select(routeResult =>
var sortRouteResults = _routeQueryResults.Select(o => new
{
Tail = o.RouteResult.ReplaceTables.First().Tail,
RouteQueryResult = o
}).OrderByIf(o => o.Tail, _isAsc, _orderPaginationConfig.TailComparer)
.OrderByDescendingIf(o => o.Tail, !_isAsc, _orderPaginationConfig.TailComparer).ToList();
var sequenceResults = new SequencePaginationList(sortRouteResults.Select(o => o.RouteQueryResult)).Skip(skip).Take(take).ToList();
var enumeratorTasks = sequenceResults.Select(sequenceResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(noPaginationQueryable, sequenceResult);
return Task.Run(async () =>
{
try
{
var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount);
var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
@ -75,23 +75,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
});
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();
return streamEnumerators;
}
public override IQueryable<TEntity> CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount)
private IQueryable<TEntity> CreateAsyncExecuteQueryable(IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
DbContextQueryStore.TryAdd(routeResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>)(_multiRouteQuery ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable())
var shardingDbContext = StreamMergeContext.CreateDbContext(sequenceResult.RouteResult);
DbContextQueryStore.TryAdd(sequenceResult.RouteResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take))
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (_multiRouteQuery && StreamMergeContext.HasSkipTake())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);

View File

@ -1,3 +1,5 @@
using System;
namespace ShardingCore.Core.Internal.Visitors
{
/*

View File

@ -48,11 +48,11 @@ namespace ShardingCore.Core.Internal.Visitors
var method = methodCallExpression.Method;
if (method.Name == nameof(Queryable.Count) || method.Name == nameof(Queryable.Sum) || method.Name == nameof(Queryable.Max) || method.Name == nameof(Queryable.Min) || method.Name == nameof(Queryable.Average))
{
_selectContext.SelectProperties.Add(new SelectProperty(node.Members[i].Name,true,method.Name));
_selectContext.SelectProperties.Add(new SelectProperty(node.Members[i].DeclaringType, node.Members[i].Name,true,method.Name));
continue;
}
}
_selectContext.SelectProperties.Add(new SelectProperty(node.Members[i].Name,false,string.Empty));
_selectContext.SelectProperties.Add(new SelectProperty(node.Members[i].DeclaringType, node.Members[i].Name,false,string.Empty));
}
return base.VisitNew(node);
}

View File

@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
namespace ShardingCore.Sharding.Visitors
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/3 15:05:27
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class RemoveOrderByDescendingVisitor : ExpressionVisitor
{
protected override Expression VisitMethodCall(MethodCallExpression node)
{
if (node.Method.Name == nameof(Queryable.OrderByDescending))
return base.Visit(node.Arguments[0]);
return base.VisitMethodCall(node);
}
}
}

View File

@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
namespace ShardingCore.Sharding.Visitors
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/3 15:04:25
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class RemoveOrderByVisitor : ExpressionVisitor
{
protected override Expression VisitMethodCall(MethodCallExpression node)
{
if (node.Method.Name == nameof(Queryable.OrderBy))
return base.Visit(node.Arguments[0]);
return base.VisitMethodCall(node);
}
}
}

View File

@ -10,13 +10,14 @@ namespace ShardingCore.Core.Internal.Visitors.Selects
*/
public class SelectProperty
{
public SelectProperty(string propertyName,bool isAggregateMethod,string aggregateMethod)
public SelectProperty( Type ownerType,string propertyName,bool isAggregateMethod,string aggregateMethod)
{
OwnerType = ownerType;
PropertyName = propertyName;
IsAggregateMethod = isAggregateMethod;
AggregateMethod = aggregateMethod;
}
public Type OwnerType { get; }
public string PropertyName { get; }
public bool IsAggregateMethod { get; }
public string AggregateMethod { get; }