This commit is contained in:
xuejmnet 2021-09-04 21:23:37 +08:00
parent 2dfe49dc9c
commit e4f6c373e8
7 changed files with 91 additions and 45 deletions

View File

@ -56,6 +56,11 @@ namespace ShardingCore.Extensions
var expression = new RemoveOrderByDescendingVisitor().Visit(source.Expression);
return (IQueryable<T>) source.Provider.CreateQuery(expression);
}
internal static IQueryable<T> RemoveAnyOrderBy<T>(this IQueryable<T> source)
{
var expression = new RemoveAnyOrderVisitor().Visit(source.Expression);
return (IQueryable<T>) source.Provider.CreateQuery(expression);
}
/// <summary>
/// 切换数据源,保留原数据源中的Expression

View File

@ -32,10 +32,15 @@ namespace ShardingCore.Sharding.PaginationConfigurations
/// 是否已开启反向排序 仅支持单排序
/// </summary>
public bool EnableReverseShardingPage => ReverseFactor > 0 && ReverseFactor < 1 && ReverseTotalGe >= 500;
/// <summary>
/// 分表发现如果少于多少条后直接取到内存 LESS THAN OR EQUAL
/// </summary>
public int TakeInMemoryCountIfLe { get; set; } = 100;
// /// <summary>
// /// 当出现N张表分页需要跳过X条数据,获取Y条数据除了total条数最多的那张表以外的其他表和小于TakeInMemoryMaxRangeSkip那么就启用
// /// </summary>
// public int TakeInMemoryMaxRangeSkip { get; set; } = 1000;
//
// public bool EnableTakeInMemory(int skip)
// {
// return skip > TakeInMemoryMaxRangeSkip && TakeInMemoryMaxRangeSkip > 500;
// }
}
}

View File

@ -36,6 +36,7 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager>();
}
public IEnumeratorStreamMergeEngine<TEntity> ExecuteAsync(CancellationToken cancellationToken = new CancellationToken())
{
//操作单表
@ -43,8 +44,9 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
{
return new SingleQueryEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext);
}
//未开启系统分表或者本次查询涉及多张分表
if (_streamMergeContext.IsPaginationQuery()&&_streamMergeContext.IsSingleShardingTableQuery()&&_shardingPageManager.Current != null)
if (_streamMergeContext.IsPaginationQuery() && _streamMergeContext.IsSingleShardingTableQuery() && _shardingPageManager.Current != null)
{
//获取虚拟表判断是否启用了分页配置
var shardingEntityType = _streamMergeContext.RouteResults.First().ReplaceTables.First().EntityType;
@ -56,8 +58,8 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
if (_streamMergeContext.Orders.IsEmpty())
{
//除了判断属性名还要判断所属关系
var appendPaginationConfig = paginationMetadata.PaginationConfigs.OrderByDescending(o=>o.AppendOrder)
.FirstOrDefault(o => o.AppendIfOrderNone&&typeof(TEntity).ContainPropertyName(o.PropertyName)&& PaginationMatch(o));
var appendPaginationConfig = paginationMetadata.PaginationConfigs.OrderByDescending(o => o.AppendOrder)
.FirstOrDefault(o => o.AppendIfOrderNone && typeof(TEntity).ContainPropertyName(o.PropertyName) && PaginationMatch(o));
if (appendPaginationConfig != null)
{
return new AppenOrderSequenceEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext, appendPaginationConfig, _shardingPageManager.Current.RouteQueryResults);
@ -65,23 +67,31 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
}
else
{
var orderCount = _streamMergeContext.Orders.Count();
var primaryOrder = _streamMergeContext.Orders.First();
var appendPaginationConfig = paginationMetadata.PaginationConfigs.FirstOrDefault(o => PaginationMatch(o,primaryOrder));
if (appendPaginationConfig != null)
if (orderCount == 1)
{
return new SequenceEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext, appendPaginationConfig, _shardingPageManager.Current.RouteQueryResults, primaryOrder.IsAsc);
}
if (_streamMergeContext.Orders.Count() == 1)
{
//skip过大reserve skip
if (paginationMetadata.EnableReverseShardingPage &&_streamMergeContext.Take.GetValueOrDefault()>0)
var sequenceFullMatchOrderConfig = paginationMetadata.PaginationConfigs.Where(o => !o.PaginationMatchEnum.HasFlag(PaginationMatchEnum.PrimaryMatch)).FirstOrDefault(o => PaginationPrimaryMatch(o, primaryOrder));
if (sequenceFullMatchOrderConfig != null)
{
var total = _shardingPageManager.Current.RouteQueryResults.Sum(o => o.QueryResult);
if (paginationMetadata.IsUseReverse(_streamMergeContext.Skip.GetValueOrDefault(), total))
{
return new ReverseShardingEnumeratorAsyncStreamMergeEngine<TEntity>(
_streamMergeContext, primaryOrder, total);
}
return new SequenceEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext, sequenceFullMatchOrderConfig, _shardingPageManager.Current.RouteQueryResults, primaryOrder.IsAsc);
}
}
var sequencePrimaryMatchOrderConfig = paginationMetadata.PaginationConfigs.Where(o => o.PaginationMatchEnum.HasFlag(PaginationMatchEnum.PrimaryMatch)).FirstOrDefault(o => PaginationPrimaryMatch(o, primaryOrder));
if (sequencePrimaryMatchOrderConfig != null)
{
return new SequenceEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext, sequencePrimaryMatchOrderConfig, _shardingPageManager.Current.RouteQueryResults, primaryOrder.IsAsc);
}
//skip过大reserve skip
if (paginationMetadata.EnableReverseShardingPage && _streamMergeContext.Take.GetValueOrDefault() > 0)
{
var total = _shardingPageManager.Current.RouteQueryResults.Sum(o => o.QueryResult);
if (paginationMetadata.IsUseReverse(_streamMergeContext.Skip.GetValueOrDefault(), total))
{
return new ReverseShardingEnumeratorAsyncStreamMergeEngine<TEntity>(
_streamMergeContext, _streamMergeContext.Orders, total);
}
}
}
@ -94,27 +104,21 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
private bool PaginationMatch(PaginationConfig paginationConfig)
{
if (paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Owner)&& !paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Named))
if (paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Owner) && !paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Named))
return typeof(TEntity) == paginationConfig.OrderPropertyInfo.DeclaringType;
return false;
}
private bool PaginationMatch(PaginationConfig paginationConfig,PropertyOrder propertyOrder)
{
if (paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.PrimaryMatch))
{
if (!propertyOrder.PropertyExpression.StartsWith(paginationConfig.PropertyName))
return false;
}
private bool PaginationPrimaryMatch(PaginationConfig paginationConfig, PropertyOrder propertyOrder)
{
if (propertyOrder.PropertyExpression != paginationConfig.PropertyName)
return false;
if (paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Owner)&&!paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Named))
if (paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Owner) && !paginationConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Named))
return typeof(TEntity) == paginationConfig.OrderPropertyInfo.DeclaringType;
return false;
}
}
}
}

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base
{
@ -25,11 +26,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
}
public SequencePaginationList Skip(long? skip)
{
if (skip > int.MaxValue)
throw new ShardingCoreException($"not support skip more than {int.MaxValue}");
_skip = skip;
return this;
}
public SequencePaginationList Take(long? take)
{
if (take > int.MaxValue)
throw new ShardingCoreException($"not support take more than {int.MaxValue}");
_take = take;
return this;
}

View File

@ -4,6 +4,7 @@ using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
@ -20,30 +21,28 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
*/
public class ReverseShardingEnumeratorAsyncStreamMergeEngine<TEntity> : AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
{
private readonly PropertyOrder _primaryOrder;
private readonly IEnumerable<PropertyOrder> _primaryOrders;
private readonly long _total;
public ReverseShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, PropertyOrder primaryOrder, long total) : base(streamMergeContext)
public ReverseShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, IEnumerable<PropertyOrder> primaryOrders, long total) : base(streamMergeContext)
{
_primaryOrder = primaryOrder;
_primaryOrders = primaryOrders;
_total = total;
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
{
var noPaginationNoOrderQueryable = _primaryOrder.IsAsc ? StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveOrderBy(): StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveOrderByDescending();
var noPaginationNoOrderQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveAnyOrderBy();
var skip = StreamMergeContext.Skip.GetValueOrDefault();
var take = StreamMergeContext.Take.GetValueOrDefault();
var take = StreamMergeContext.Take.HasValue?StreamMergeContext.Take.Value:(_total-skip);
if (take > int.MaxValue)
throw new ShardingCoreException($"not support take more than {int.MaxValue}");
var realSkip = _total- take- skip;
var tableResult = StreamMergeContext.RouteResults;
StreamMergeContext.ReSetSkip((int)realSkip);
var propertyOrders = new List<PropertyOrder>()
{
new PropertyOrder( _primaryOrder.PropertyExpression,!_primaryOrder.IsAsc)
};
var propertyOrders =_primaryOrders.Select(o=>new PropertyOrder( o.PropertyExpression,!o.IsAsc)).ToArray();
StreamMergeContext.ReSetOrders(propertyOrders);
var reverseOrderQueryable = noPaginationNoOrderQueryable.Take((int)realSkip+take).OrderWithExpression(propertyOrders);
var reverseOrderQueryable = noPaginationNoOrderQueryable.Take((int)realSkip+(int)take).OrderWithExpression(propertyOrders);
var enumeratorTasks = tableResult.Select(routeResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(reverseOrderQueryable,routeResult);

View File

@ -2,7 +2,6 @@ using System.Linq;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync

View File

@ -0,0 +1,29 @@
using System;
using System.Linq;
using System.Linq.Expressions;
namespace ShardingCore.Sharding.Visitors
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 04 September 2021 21:04:34
* @Email: 326308290@qq.com
*/
public class RemoveAnyOrderVisitor: ExpressionVisitor
{
protected override Expression VisitMethodCall(MethodCallExpression node)
{
if (node.Method.Name == nameof(Queryable.OrderBy))
return base.Visit(node.Arguments[0]);
if (node.Method.Name == nameof(Queryable.OrderByDescending))
return base.Visit(node.Arguments[0]);
if (node.Method.Name == nameof(Queryable.ThenBy))
return base.Visit(node.Arguments[0]);
if (node.Method.Name == nameof(Queryable.ThenByDescending))
return base.Visit(node.Arguments[0]);
return base.VisitMethodCall(node);
}
}
}