优化代码并且支持Max和Min的断路
This commit is contained in:
parent
6972d428b9
commit
e53b7d4e59
|
@ -14,6 +14,7 @@ using Microsoft.EntityFrameworkCore.Migrations;
|
|||
using Sample.Migrations.EFCores;
|
||||
using ShardingCore;
|
||||
using ShardingCore.Bootstrapers;
|
||||
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions;
|
||||
|
||||
namespace Sample.Migrations
|
||||
{
|
||||
|
@ -32,6 +33,19 @@ namespace Sample.Migrations
|
|||
|
||||
services.AddControllers();
|
||||
|
||||
//services.AddDbContext<DefaultShardingTableDbContext>((sp, builder) =>
|
||||
//{
|
||||
// var virtualDataSource =
|
||||
// sp.GetRequiredService<IVirtualDataSourceManager<DefaultShardingTableDbContext>>()
|
||||
// .GetCurrentVirtualDataSource();
|
||||
// var connectionString = virtualDataSource.GetConnectionString(virtualDataSource.DefaultDataSourceName);
|
||||
// virtualDataSource.ConfigurationParams.UseDbContextOptionsBuilder(connectionString, builder)
|
||||
// .UseSharding<DefaultShardingTableDbContext>();
|
||||
//});
|
||||
|
||||
|
||||
//services.AddShardingConfigure<DefaultShardingTableDbContext>().ShardingEntityConfigOptions...
|
||||
|
||||
//services.AddShardingDbContext<DefaultShardingTableDbContext>(
|
||||
// (conn, o) =>
|
||||
// o.UseSqlServer(conn)
|
||||
|
|
|
@ -222,6 +222,7 @@ namespace Sample.SqlServer.Controllers
|
|||
public async Task<IActionResult> Get4()
|
||||
{
|
||||
var xxxaaa = await _defaultTableDbContext.Set<SysUserSalary>().FirstOrDefaultAsync();
|
||||
|
||||
Console.WriteLine("----0----");
|
||||
var xxx = await _defaultTableDbContext.Set<SysUserSalary>().OrderByDescending(o=>o.DateOfMonth).FirstOrDefaultAsync();
|
||||
Console.WriteLine("----1----");
|
||||
|
@ -231,6 +232,9 @@ namespace Sample.SqlServer.Controllers
|
|||
Console.WriteLine("----3----");
|
||||
var xxx21 = await _defaultTableDbContext.Set<SysUserSalary>().OrderByDescending(o => o.DateOfMonth).LastAsync();
|
||||
Console.WriteLine("----4----");
|
||||
|
||||
await _defaultTableDbContext.Set<SysUserSalary>().MaxAsync(o => o.DateOfMonth);
|
||||
await _defaultTableDbContext.Set<SysUserSalary>().MinAsync(o => o.DateOfMonth);
|
||||
return Ok(new{ xxx , xxx1});
|
||||
}
|
||||
|
||||
|
|
|
@ -8,13 +8,35 @@ namespace Sample.SqlServer.Shardings
|
|||
{
|
||||
public void Configure(EntityQueryBuilder<SysUserSalary> builder)
|
||||
{
|
||||
//当前表示按月分片,月份的排序字符串和int是一样的所以用某人的即可
|
||||
builder.ShardingTailComparer(Comparer<string>.Default);
|
||||
//DateOfMonth的排序和月份分片的后缀一致所以用true如果false,无果无关就不需要配置
|
||||
builder.AddOrder(o => o.DateOfMonth,true);
|
||||
builder.AddConnectionsLimit(2, QueryableMethodNameEnum.First, QueryableMethodNameEnum.FirstOrDefault,QueryableMethodNameEnum.Any,QueryableMethodNameEnum.LastOrDefault,QueryableMethodNameEnum.Last);
|
||||
////SysUserSalary表是按月分片,月份的排序字符串和int是一样的所以用默认的即可
|
||||
//#region 第一种仅配置后缀比较器或者不配置(默认就是字符串比较器)
|
||||
////当前情况下只有Any All Contains会进行中断
|
||||
//builder.ShardingTailComparer(Comparer<string>.Default);
|
||||
//#endregion
|
||||
|
||||
//#region 第二种配置后缀比较器并且配置排序相对于比较器的
|
||||
//builder.ShardingTailComparer(Comparer<string>.Default);
|
||||
////DateOfMonth的排序和月份分片的后缀一致所以用true如果false,无果无关就不需要配置
|
||||
//builder.AddOrder(o => o.DateOfMonth, true);
|
||||
//#endregion
|
||||
|
||||
//#region 第三种配置后缀比较器并且配置排序相对于比较器的
|
||||
//builder.ShardingTailComparer(Comparer<string>.Default);
|
||||
//builder.AddDefaultSequenceQueryTrip(false, CircuitBreakerMethodNameEnum.FirstOrDefault);
|
||||
//#endregion
|
||||
|
||||
|
||||
#region 第四种
|
||||
builder.ShardingTailComparer(Comparer<string>.Default, false);
|
||||
//DateOfMonth的排序和月份分片的后缀一致所以用true如果false,无果无关就不需要配置
|
||||
builder.AddOrder(o => o.DateOfMonth, false);
|
||||
builder.AddDefaultSequenceQueryTrip(false, CircuitBreakerMethodNameEnum.FirstOrDefault);
|
||||
#endregion
|
||||
|
||||
|
||||
|
||||
builder.AddConnectionsLimit(2, LimitMethodNameEnum.First, LimitMethodNameEnum.FirstOrDefault, LimitMethodNameEnum.Any, LimitMethodNameEnum.LastOrDefault, LimitMethodNameEnum.Last, LimitMethodNameEnum.Max, LimitMethodNameEnum.Min);
|
||||
|
||||
builder.AddDefaultSequenceQueryTrip(false, QueryableMethodNameEnum.FirstOrDefault);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ namespace ShardingCore.Sharding.Abstractions
|
|||
public interface ISeqQueryProvider
|
||||
{
|
||||
bool IsSeqQuery();
|
||||
bool IsParallelExecute();
|
||||
|
||||
bool CanTrip();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,11 +25,13 @@ namespace ShardingCore.Sharding.EntityQueryConfigurations
|
|||
/// 添加分表后缀排序
|
||||
/// </summary>
|
||||
/// <param name="tailComparer"></param>
|
||||
/// <param name="reverse">是否和tailComparer排序相反</param>
|
||||
/// <returns></returns>
|
||||
/// <exception cref="ArgumentNullException"></exception>
|
||||
public EntityQueryBuilder<TEntity> ShardingTailComparer(IComparer<string> tailComparer)
|
||||
public EntityQueryBuilder<TEntity> ShardingTailComparer(IComparer<string> tailComparer,bool reverse = true)
|
||||
{
|
||||
_entityQueryMetadata.DefaultTailComparer = tailComparer ?? throw new ArgumentNullException(nameof(tailComparer));
|
||||
_entityQueryMetadata.DefaultTailComparerNeedReverse = reverse;
|
||||
return this;
|
||||
}
|
||||
/// <summary>
|
||||
|
@ -37,11 +39,11 @@ namespace ShardingCore.Sharding.EntityQueryConfigurations
|
|||
/// </summary>
|
||||
/// <typeparam name="TProperty"></typeparam>
|
||||
/// <param name="primaryOrderPropertyExpression"></param>
|
||||
/// <param name="isAsc">true:当前属性正序和comparer正序一样,false:当前属性倒序和comparer正序一样</param>
|
||||
/// <param name="isSameAsShardingTailComparer">true:当前属性正序和comparer正序一样,false:当前属性倒序和comparer正序一样</param>
|
||||
/// <returns></returns>
|
||||
public EntityQueryBuilder<TEntity> AddOrder<TProperty>(Expression<Func<TEntity, TProperty>> primaryOrderPropertyExpression,bool isAsc=true)
|
||||
public EntityQueryBuilder<TEntity> AddOrder<TProperty>(Expression<Func<TEntity, TProperty>> primaryOrderPropertyExpression,bool isSameAsShardingTailComparer = true)
|
||||
{
|
||||
_entityQueryMetadata.AddSeqComparerOrder(primaryOrderPropertyExpression.GetPropertyAccess().Name, isAsc);
|
||||
_entityQueryMetadata.AddSeqComparerOrder(primaryOrderPropertyExpression.GetPropertyAccess().Name, isSameAsShardingTailComparer);
|
||||
return this;
|
||||
}
|
||||
/// <summary>
|
||||
|
@ -51,7 +53,7 @@ namespace ShardingCore.Sharding.EntityQueryConfigurations
|
|||
/// <param name="methodNames">查询方法</param>
|
||||
/// <returns></returns>
|
||||
/// <exception cref="ArgumentNullException"></exception>
|
||||
public EntityQueryBuilder<TEntity> AddConnectionsLimit(int connectionsLimit,params QueryableMethodNameEnum[] methodNames)
|
||||
public EntityQueryBuilder<TEntity> AddConnectionsLimit(int connectionsLimit,params LimitMethodNameEnum[] methodNames)
|
||||
{
|
||||
if (connectionsLimit < 1)
|
||||
throw new ArgumentNullException($"{nameof(connectionsLimit)} should >= 1");
|
||||
|
@ -64,16 +66,16 @@ namespace ShardingCore.Sharding.EntityQueryConfigurations
|
|||
/// <summary>
|
||||
/// 配置默认方法不带排序的时候采用什么排序来触发熔断
|
||||
/// </summary>
|
||||
/// <param name="asc"></param>
|
||||
/// <param name="isSameAsShardingTailComparer">true表示和默认的ShardingTailComparer排序一致,false表示和磨人的排序相反</param>
|
||||
/// <param name="methodNames"></param>
|
||||
/// <returns></returns>
|
||||
/// <exception cref="ArgumentNullException"></exception>
|
||||
public EntityQueryBuilder<TEntity> AddDefaultSequenceQueryTrip(bool asc,params QueryableMethodNameEnum[] methodNames)
|
||||
public EntityQueryBuilder<TEntity> AddDefaultSequenceQueryTrip(bool isSameAsShardingTailComparer,params CircuitBreakerMethodNameEnum[] methodNames)
|
||||
{
|
||||
|
||||
foreach (var methodName in methodNames)
|
||||
{
|
||||
_entityQueryMetadata.AddDefaultSequenceQueryTrip(asc,methodName);
|
||||
_entityQueryMetadata.AddDefaultSequenceQueryTrip(isSameAsShardingTailComparer, methodName);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -6,26 +6,48 @@ namespace ShardingCore.Sharding.EntityQueryConfigurations
|
|||
{
|
||||
public class EntityQueryMetadata
|
||||
{
|
||||
private static readonly IDictionary<QueryableMethodNameEnum, string> MethodNameSupports;
|
||||
private static readonly IDictionary<CircuitBreakerMethodNameEnum, string> CircuitBreakerMethodNameSupports;
|
||||
private static readonly IDictionary<LimitMethodNameEnum, string> LimitMethodNameSupports;
|
||||
|
||||
static EntityQueryMetadata()
|
||||
{
|
||||
MethodNameSupports = new Dictionary<QueryableMethodNameEnum, string>()
|
||||
CircuitBreakerMethodNameSupports = new Dictionary<CircuitBreakerMethodNameEnum, string>()
|
||||
{
|
||||
{ QueryableMethodNameEnum.First, nameof(Queryable.First) },
|
||||
{ QueryableMethodNameEnum.FirstOrDefault, nameof(Queryable.FirstOrDefault) },
|
||||
{ QueryableMethodNameEnum.Last, nameof(Queryable.Last) },
|
||||
{ QueryableMethodNameEnum.LastOrDefault, nameof(Queryable.LastOrDefault) },
|
||||
{ QueryableMethodNameEnum.Single, nameof(Queryable.Single) },
|
||||
{ QueryableMethodNameEnum.SingleOrDefault, nameof(Queryable.SingleOrDefault) },
|
||||
{ QueryableMethodNameEnum.Any, nameof(Queryable.Any) },
|
||||
{ QueryableMethodNameEnum.All, nameof(Queryable.All) },
|
||||
{ QueryableMethodNameEnum.Contains, nameof(Queryable.Contains) }
|
||||
{ CircuitBreakerMethodNameEnum.First, nameof(Queryable.First) },
|
||||
{ CircuitBreakerMethodNameEnum.FirstOrDefault, nameof(Queryable.FirstOrDefault) },
|
||||
{ CircuitBreakerMethodNameEnum.Last, nameof(Queryable.Last) },
|
||||
{ CircuitBreakerMethodNameEnum.LastOrDefault, nameof(Queryable.LastOrDefault) },
|
||||
{ CircuitBreakerMethodNameEnum.Single, nameof(Queryable.Single) },
|
||||
{ CircuitBreakerMethodNameEnum.SingleOrDefault, nameof(Queryable.SingleOrDefault) },
|
||||
{ CircuitBreakerMethodNameEnum.Any, nameof(Queryable.Any) },
|
||||
{ CircuitBreakerMethodNameEnum.All, nameof(Queryable.All) },
|
||||
{ CircuitBreakerMethodNameEnum.Contains, nameof(Queryable.Contains) },
|
||||
{ CircuitBreakerMethodNameEnum.Max, nameof(Queryable.Max) },
|
||||
{ CircuitBreakerMethodNameEnum.Min, nameof(Queryable.Min) }
|
||||
};
|
||||
LimitMethodNameSupports = new Dictionary<LimitMethodNameEnum, string>()
|
||||
{
|
||||
{ LimitMethodNameEnum.First, nameof(Queryable.First) },
|
||||
{ LimitMethodNameEnum.FirstOrDefault, nameof(Queryable.FirstOrDefault) },
|
||||
{ LimitMethodNameEnum.Last, nameof(Queryable.Last) },
|
||||
{ LimitMethodNameEnum.LastOrDefault, nameof(Queryable.LastOrDefault) },
|
||||
{ LimitMethodNameEnum.Single, nameof(Queryable.Single) },
|
||||
{ LimitMethodNameEnum.SingleOrDefault, nameof(Queryable.SingleOrDefault) },
|
||||
{ LimitMethodNameEnum.Any, nameof(Queryable.Any) },
|
||||
{ LimitMethodNameEnum.All, nameof(Queryable.All) },
|
||||
{ LimitMethodNameEnum.Contains, nameof(Queryable.Contains) },
|
||||
{ LimitMethodNameEnum.Max, nameof(Queryable.Max) },
|
||||
{ LimitMethodNameEnum.Min, nameof(Queryable.Min) },
|
||||
{ LimitMethodNameEnum.Count, nameof(Queryable.Count) },
|
||||
{ LimitMethodNameEnum.LongCount, nameof(Queryable.LongCount) },
|
||||
{ LimitMethodNameEnum.Sum, nameof(Queryable.Sum) },
|
||||
{ LimitMethodNameEnum.Average, nameof(Queryable.Average) }
|
||||
};
|
||||
}
|
||||
|
||||
private readonly IDictionary<string,bool> _seqQueryOrders;
|
||||
public IComparer<string> DefaultTailComparer { get; set; }
|
||||
public bool DefaultTailComparerNeedReverse { get; set; }
|
||||
|
||||
private readonly IDictionary<string, int> _seqConnectionsLimit;
|
||||
private readonly IDictionary<string, bool> _seqQueryDefaults;
|
||||
|
@ -36,6 +58,7 @@ namespace ShardingCore.Sharding.EntityQueryConfigurations
|
|||
{
|
||||
_seqQueryOrders = new Dictionary<string, bool>();
|
||||
DefaultTailComparer =Comparer<string>.Default;
|
||||
DefaultTailComparerNeedReverse = true;
|
||||
_seqConnectionsLimit = new Dictionary<string, int>();
|
||||
_seqQueryDefaults = new Dictionary<string, bool>();
|
||||
}
|
||||
|
@ -44,16 +67,16 @@ namespace ShardingCore.Sharding.EntityQueryConfigurations
|
|||
/// 添加和默认数据库排序一样的排序
|
||||
/// </summary>
|
||||
/// <param name="propertyName"></param>
|
||||
/// <param name="isAsc"></param>
|
||||
public void AddSeqComparerOrder(string propertyName,bool isAsc)
|
||||
/// <param name="isSameAsShardingTailComparer"></param>
|
||||
public void AddSeqComparerOrder(string propertyName,bool isSameAsShardingTailComparer)
|
||||
{
|
||||
if (_seqQueryOrders.ContainsKey(propertyName))
|
||||
{
|
||||
_seqQueryOrders[propertyName] = isAsc;
|
||||
_seqQueryOrders[propertyName] = isSameAsShardingTailComparer;
|
||||
}
|
||||
else
|
||||
{
|
||||
_seqQueryOrders.Add(propertyName, isAsc);
|
||||
_seqQueryOrders.Add(propertyName, isSameAsShardingTailComparer);
|
||||
}
|
||||
}
|
||||
/// <summary>
|
||||
|
@ -62,9 +85,9 @@ namespace ShardingCore.Sharding.EntityQueryConfigurations
|
|||
/// <param name="limit"></param>
|
||||
/// <param name="methodNameEnum"></param>
|
||||
/// <exception cref="ArgumentException"></exception>
|
||||
public void AddConnectionsLimit(int limit, QueryableMethodNameEnum methodNameEnum)
|
||||
public void AddConnectionsLimit(int limit, LimitMethodNameEnum methodNameEnum)
|
||||
{
|
||||
if (!MethodNameSupports.TryGetValue(methodNameEnum, out var methodName))
|
||||
if (!LimitMethodNameSupports.TryGetValue(methodNameEnum, out var methodName))
|
||||
{
|
||||
throw new ArgumentException(methodNameEnum.ToString());
|
||||
}
|
||||
|
@ -99,39 +122,39 @@ namespace ShardingCore.Sharding.EntityQueryConfigurations
|
|||
/// 是否包含当前排序字段
|
||||
/// </summary>
|
||||
/// <param name="propertyName"></param>
|
||||
/// <param name="asc"></param>
|
||||
/// <param name="isSameAsShardingTailComparer"></param>
|
||||
/// <returns></returns>
|
||||
public bool TryContainsComparerOrder(string propertyName,out bool asc)
|
||||
public bool TryContainsComparerOrder(string propertyName,out bool isSameAsShardingTailComparer)
|
||||
{
|
||||
if (_seqQueryOrders.TryGetValue(propertyName, out var v))
|
||||
{
|
||||
asc = v;
|
||||
isSameAsShardingTailComparer = v;
|
||||
return true;
|
||||
}
|
||||
asc = false;
|
||||
isSameAsShardingTailComparer = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 默认顺序查询熔断
|
||||
/// </summary>
|
||||
/// <param name="asc"></param>
|
||||
/// <param name="isSameAsShardingTailComparer"></param>
|
||||
/// <param name="methodNameEnum"></param>
|
||||
/// <exception cref="ArgumentException"></exception>
|
||||
public void AddDefaultSequenceQueryTrip(bool asc,QueryableMethodNameEnum methodNameEnum)
|
||||
public void AddDefaultSequenceQueryTrip(bool isSameAsShardingTailComparer, CircuitBreakerMethodNameEnum methodNameEnum)
|
||||
{
|
||||
if (!MethodNameSupports.TryGetValue(methodNameEnum, out var methodName))
|
||||
if (!CircuitBreakerMethodNameSupports.TryGetValue(methodNameEnum, out var methodName))
|
||||
{
|
||||
throw new ArgumentException(methodNameEnum.ToString());
|
||||
}
|
||||
|
||||
if (_seqQueryDefaults.ContainsKey(methodName))
|
||||
{
|
||||
_seqQueryDefaults[methodName] = asc;
|
||||
_seqQueryDefaults[methodName] = isSameAsShardingTailComparer;
|
||||
}
|
||||
else
|
||||
{
|
||||
_seqQueryDefaults.Add(methodName, asc);
|
||||
_seqQueryDefaults.Add(methodName, isSameAsShardingTailComparer);
|
||||
}
|
||||
}
|
||||
/// <summary>
|
||||
|
|
|
@ -6,7 +6,7 @@ using System.Threading.Tasks;
|
|||
|
||||
namespace ShardingCore.Sharding.EntityQueryConfigurations
|
||||
{
|
||||
public enum QueryableMethodNameEnum
|
||||
public enum CircuitBreakerMethodNameEnum
|
||||
{
|
||||
First,
|
||||
FirstOrDefault,
|
||||
|
@ -16,6 +16,26 @@ namespace ShardingCore.Sharding.EntityQueryConfigurations
|
|||
SingleOrDefault,
|
||||
Any,
|
||||
All,
|
||||
Contains
|
||||
Contains,
|
||||
Max,
|
||||
Min
|
||||
}
|
||||
public enum LimitMethodNameEnum
|
||||
{
|
||||
First,
|
||||
FirstOrDefault,
|
||||
Last,
|
||||
LastOrDefault,
|
||||
Single,
|
||||
SingleOrDefault,
|
||||
Any,
|
||||
All,
|
||||
Contains,
|
||||
Max,
|
||||
Min,
|
||||
Count,
|
||||
LongCount,
|
||||
Sum,
|
||||
Average
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,20 +126,14 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
|
|||
private IEnumerable<ISqlRouteUnit> ReOrderTableTails(IEnumerable<ISqlRouteUnit> sqlRouteUnits)
|
||||
{
|
||||
var streamMergeContext = GetStreamMergeContext();
|
||||
var equalPropertyOrder = ExecuteOrderEqualPropertyOrder();
|
||||
if (streamMergeContext.IsSeqQuery())
|
||||
{
|
||||
return sqlRouteUnits.OrderByAscDescIf(o => o.TableRouteResult.ReplaceTables.First().Tail,
|
||||
(equalPropertyOrder ? streamMergeContext.TailComparerIsAsc : !streamMergeContext.TailComparerIsAsc), streamMergeContext.ShardingTailComparer);
|
||||
return sqlRouteUnits.OrderByAscDescIf(o => o.TableRouteResult.ReplaceTables.First().Tail, streamMergeContext.TailComparerNeedReverse, streamMergeContext.ShardingTailComparer);
|
||||
}
|
||||
|
||||
return sqlRouteUnits;
|
||||
}
|
||||
|
||||
protected virtual bool ExecuteOrderEqualPropertyOrder()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
/// <summary>
|
||||
/// 每个数据源下的分表结果按 maxQueryConnectionsLimit 进行组合分组每组大小 maxQueryConnectionsLimit
|
||||
/// ConnectionModeEnum为用户配置或者系统自动计算,哪怕是用户指定也是按照maxQueryConnectionsLimit来进行分组。
|
||||
|
|
|
@ -21,7 +21,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
|
|||
* @Ver: 1.0
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
internal class MaxAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractNoTripEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
|
||||
internal class MaxAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
|
||||
{
|
||||
public MaxAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
|
||||
{
|
||||
|
@ -92,5 +92,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
|
|||
var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult));
|
||||
return Expression.Lambda<Func<TResult>>(convertExpr).Compile()();
|
||||
}
|
||||
|
||||
protected override IParallelExecuteControl<TResult1> CreateParallelExecuteControl<TResult1>(IParallelExecutor<TResult1> executor)
|
||||
{
|
||||
return AnyElementParallelExecuteControl<TResult1>.Create(GetStreamMergeContext(), executor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
|
|||
* @Ver: 1.0
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
internal class MinAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractNoTripEnsureMethodCallInMemoryAsyncMergeEngine<TEntity,TResult>
|
||||
internal class MinAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity,TResult>
|
||||
{
|
||||
public MinAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
|
||||
{
|
||||
|
@ -98,5 +98,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
|
|||
var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult));
|
||||
return Expression.Lambda<Func<TResult>>(convertExpr).Compile()();
|
||||
}
|
||||
|
||||
protected override IParallelExecuteControl<TResult1> CreateParallelExecuteControl<TResult1>(IParallelExecutor<TResult1> executor)
|
||||
{
|
||||
return AnyElementParallelExecuteControl<TResult1>.Create(GetStreamMergeContext(), executor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,10 +43,5 @@ namespace ShardingCore.Sharding.StreamMergeEngines
|
|||
{
|
||||
return AnyElementParallelExecuteControl<TResult>.Create(GetStreamMergeContext(),executor);
|
||||
}
|
||||
|
||||
protected override bool ExecuteOrderEqualPropertyOrder()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,10 +43,5 @@ namespace ShardingCore.Sharding.StreamMergeEngines
|
|||
{
|
||||
return AnyElementParallelExecuteControl<TResult>.Create(GetStreamMergeContext(),executor);
|
||||
}
|
||||
|
||||
protected override bool ExecuteOrderEqualPropertyOrder()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
|
|||
return true;
|
||||
if (_seqQueryProvider.IsSeqQuery())
|
||||
{
|
||||
if (_seqQueryProvider.IsParallelExecute())
|
||||
if (_seqQueryProvider.CanTrip())
|
||||
{
|
||||
if (SeqConditionalTrip(results))
|
||||
{
|
||||
|
|
|
@ -12,6 +12,9 @@ using ShardingCore.Sharding.StreamMergeEngines;
|
|||
|
||||
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
|
||||
{
|
||||
/// <summary>
|
||||
/// use First、FirstOrDefault、Last、LastOrDefault、Max、Min
|
||||
/// </summary>
|
||||
internal class AnyElementCircuitBreaker : AbstractCircuitBreaker
|
||||
{
|
||||
public AnyElementCircuitBreaker(ISeqQueryProvider seqQueryProvider) : base(seqQueryProvider)
|
||||
|
|
|
@ -34,7 +34,7 @@ namespace ShardingCore.Sharding
|
|||
* @Date: Monday, 25 January 2021 11:38:27
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
public class StreamMergeContext<TEntity> : ISeqQueryProvider,IDisposable
|
||||
public class StreamMergeContext<TEntity> : ISeqQueryProvider, IDisposable
|
||||
#if !EFCORE2
|
||||
, IAsyncDisposable
|
||||
#endif
|
||||
|
@ -79,14 +79,14 @@ namespace ShardingCore.Sharding
|
|||
private readonly IShardingEntityConfigOptions _shardingEntityConfigOptions;
|
||||
|
||||
private readonly ConcurrentDictionary<DbContext, object> _parallelDbContexts;
|
||||
|
||||
private readonly bool _seqQuery = false;
|
||||
|
||||
public IComparer<string> ShardingTailComparer { get; } = Comparer<string>.Default;
|
||||
/// <summary>
|
||||
/// 分表后缀比较是否重排正序
|
||||
/// </summary>
|
||||
public bool TailComparerIsAsc { get; } = true;
|
||||
|
||||
private readonly bool _seqQuery=false;
|
||||
|
||||
public IComparer<string> ShardingTailComparer { get; } = Comparer<string>.Default;
|
||||
public bool TailComparerNeedReverse { get; } = true;
|
||||
|
||||
private int _maxParallelExecuteCount;
|
||||
|
||||
|
@ -121,7 +121,7 @@ namespace ShardingCore.Sharding
|
|||
|
||||
_maxParallelExecuteCount = _shardingDbContext.GetVirtualDataSource().ConfigurationParams.MaxQueryConnectionsLimit;
|
||||
|
||||
if (IsSingleShardingEntityQuery() && !Skip.HasValue&&IsCrossTable &&!IsUnSupportSharding())
|
||||
if (IsSingleShardingEntityQuery() && !Skip.HasValue && IsCrossTable && !IsUnSupportSharding())
|
||||
{
|
||||
var singleShardingEntityType = GetSingleShardingEntityType();
|
||||
var virtualTableManager = (IVirtualTableManager)ShardingContainer.GetService(typeof(IVirtualTableManager<>).GetGenericType0(MergeQueryCompilerContext.GetShardingDbContextType()));
|
||||
|
@ -131,13 +131,14 @@ namespace ShardingCore.Sharding
|
|||
{
|
||||
ShardingTailComparer =
|
||||
virtualTable.EntityQueryMetadata.DefaultTailComparer ?? Comparer<string>.Default;
|
||||
TailComparerNeedReverse = virtualTable.EntityQueryMetadata.DefaultTailComparerNeedReverse;
|
||||
string methodName = null;
|
||||
if (!MergeQueryCompilerContext.IsEnumerableQuery())
|
||||
{
|
||||
methodName = ((MethodCallExpression)MergeQueryCompilerContext.GetQueryExpression()).Method.Name;
|
||||
if (virtualTable.EntityQueryMetadata.TryGetConnectionsLimit(methodName,out var limit))
|
||||
if (virtualTable.EntityQueryMetadata.TryGetConnectionsLimit(methodName, out var limit))
|
||||
{
|
||||
_maxParallelExecuteCount = Math.Min(limit,_maxParallelExecuteCount);
|
||||
_maxParallelExecuteCount = Math.Min(limit, _maxParallelExecuteCount);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -146,12 +147,34 @@ namespace ShardingCore.Sharding
|
|||
out var tailComparerIsAsc))
|
||||
{
|
||||
_seqQuery = true;
|
||||
TailComparerIsAsc = tailComparerIsAsc;
|
||||
if (!tailComparerIsAsc)
|
||||
{
|
||||
TailComparerNeedReverse = !TailComparerNeedReverse;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/// <summary>
|
||||
/// 是否需要判断order
|
||||
/// </summary>
|
||||
/// <param name="methodName"></param>
|
||||
/// <param name="propertyOrders"></param>
|
||||
/// <returns></returns>
|
||||
private bool EffectOrder(string methodName, PropertyOrder[] propertyOrders)
|
||||
{
|
||||
if ((methodName==null ||
|
||||
nameof(Queryable.First) == methodName ||
|
||||
nameof(Queryable.FirstOrDefault) == methodName ||
|
||||
nameof(Queryable.Last) == methodName ||
|
||||
nameof(Queryable.LastOrDefault) == methodName ||
|
||||
nameof(Queryable.Single) == methodName ||
|
||||
nameof(Queryable.SingleOrDefault) == methodName) &&
|
||||
propertyOrders.Length > 0)
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
/// <summary>
|
||||
/// 尝试获取当前方法是否采用顺序查询,如果有先判断排序没有的情况下判断默认
|
||||
/// </summary>
|
||||
/// <param name="propertyOrders"></param>
|
||||
|
@ -160,9 +183,11 @@ namespace ShardingCore.Sharding
|
|||
/// <param name="methodName"></param>
|
||||
/// <param name="tailComparerIsAsc"></param>
|
||||
/// <returns></returns>
|
||||
private bool TryGetSequenceQuery(PropertyOrder[] propertyOrders, Type singleShardingEntityType,IVirtualTable virtualTable,string methodName, out bool tailComparerIsAsc)
|
||||
private bool TryGetSequenceQuery(PropertyOrder[] propertyOrders, Type singleShardingEntityType, IVirtualTable virtualTable, string methodName, out bool tailComparerIsAsc)
|
||||
{
|
||||
if (propertyOrders.IsNotEmpty())
|
||||
var effectOrder = EffectOrder(methodName,propertyOrders);
|
||||
|
||||
if (effectOrder)
|
||||
{
|
||||
var primaryOrder = propertyOrders[0];
|
||||
//不是多级不能是匿名对象
|
||||
|
@ -171,18 +196,33 @@ namespace ShardingCore.Sharding
|
|||
if (virtualTable.EnableEntityQuery && virtualTable.EntityQueryMetadata.TryContainsComparerOrder(primaryOrder.PropertyExpression, out var asc))
|
||||
{
|
||||
tailComparerIsAsc = asc ? primaryOrder.IsAsc : !primaryOrder.IsAsc;
|
||||
//如果是获取最后一个还需要再次翻转
|
||||
if (nameof(Queryable.Last) == methodName || nameof(Queryable.LastOrDefault) == methodName)
|
||||
{
|
||||
tailComparerIsAsc = !tailComparerIsAsc;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Max和Min不受order影响
|
||||
if (nameof(Queryable.Max) == methodName || nameof(Queryable.Min) == methodName)
|
||||
{
|
||||
//如果是max或者min
|
||||
if (virtualTable.EnableEntityQuery && SelectContext.SelectProperties.Count == 1 && virtualTable.EntityQueryMetadata.TryContainsComparerOrder(SelectContext.SelectProperties[0].PropertyName, out var asc))
|
||||
{
|
||||
tailComparerIsAsc = asc ? nameof(Queryable.Min) == methodName : nameof(Queryable.Max) == methodName;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (virtualTable.EnableEntityQuery && methodName != null &&
|
||||
virtualTable.EntityQueryMetadata.TryGetDefaultSequenceQueryTrip(methodName,out var defaultAsc))
|
||||
virtualTable.EntityQueryMetadata.TryGetDefaultSequenceQueryTrip(methodName, out var defaultAsc))
|
||||
{
|
||||
tailComparerIsAsc = defaultAsc;
|
||||
return true;
|
||||
}
|
||||
|
||||
tailComparerIsAsc = true;
|
||||
return false;
|
||||
}
|
||||
|
@ -276,7 +316,7 @@ namespace ShardingCore.Sharding
|
|||
|
||||
public bool IsSingleShardingEntityQuery()
|
||||
{
|
||||
return QueryEntities.Count(o=>MergeQueryCompilerContext.GetEntityMetadataManager().IsSharding(o)) == 1;
|
||||
return QueryEntities.Count(o => MergeQueryCompilerContext.GetEntityMetadataManager().IsSharding(o)) == 1;
|
||||
}
|
||||
public Type GetSingleShardingEntityType()
|
||||
{
|
||||
|
@ -321,7 +361,7 @@ namespace ShardingCore.Sharding
|
|||
/// <returns></returns>
|
||||
private bool IsUseReadWriteSeparation()
|
||||
{
|
||||
return _shardingDbContext.IsUseReadWriteSeparation()&&_shardingDbContext.CurrentIsReadWriteSeparation();
|
||||
return _shardingDbContext.IsUseReadWriteSeparation() && _shardingDbContext.CurrentIsReadWriteSeparation();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -330,7 +370,7 @@ namespace ShardingCore.Sharding
|
|||
/// <returns></returns>
|
||||
public bool IsParallelQuery()
|
||||
{
|
||||
return MergeQueryCompilerContext.IsParallelQuery();
|
||||
return MergeQueryCompilerContext.IsParallelQuery();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -431,7 +471,7 @@ namespace ShardingCore.Sharding
|
|||
return _seqQuery;
|
||||
}
|
||||
|
||||
public bool IsParallelExecute()
|
||||
public bool CanTrip()
|
||||
{
|
||||
return TableRouteResults.Length > GetMaxQueryConnectionsLimit();
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ using ShardingCore.Core.Internal.Visitors.GroupBys;
|
|||
using ShardingCore.Core.Internal.Visitors.Selects;
|
||||
using ShardingCore.Exceptions;
|
||||
using ShardingCore.Extensions;
|
||||
using ShardingCore.Sharding.Visitors.Selects;
|
||||
|
||||
namespace ShardingCore.Core.Internal.Visitors
|
||||
{
|
||||
|
@ -111,12 +112,25 @@ namespace ShardingCore.Core.Internal.Visitors
|
|||
{
|
||||
if (_selectContext.SelectProperties.IsEmpty())
|
||||
{
|
||||
var expression = ((node.Arguments[1] as UnaryExpression).Operand as LambdaExpression).Body as NewExpression;
|
||||
if (expression != null)
|
||||
var expression = ((node.Arguments[1] as UnaryExpression).Operand as LambdaExpression).Body;
|
||||
if (expression is NewExpression newExpression)
|
||||
{
|
||||
var aggregateDiscoverVisitor = new QuerySelectDiscoverVisitor(_selectContext);
|
||||
aggregateDiscoverVisitor.Visit(expression);
|
||||
aggregateDiscoverVisitor.Visit(newExpression);
|
||||
} else if (expression is MemberExpression memberExpression)
|
||||
{
|
||||
|
||||
var declaringType = memberExpression.Member.DeclaringType;
|
||||
var memberName = memberExpression.Member.Name;
|
||||
var propertyInfo = declaringType.GetProperty(memberName);
|
||||
_selectContext.SelectProperties.Add(new SelectProperty(declaringType, propertyInfo));
|
||||
//memberExpression.Acc
|
||||
}
|
||||
//if (expression != null)
|
||||
//{
|
||||
// var aggregateDiscoverVisitor = new QuerySelectDiscoverVisitor(_selectContext);
|
||||
// aggregateDiscoverVisitor.Visit(expression);
|
||||
//}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue