sharding page process part

This commit is contained in:
xuejiaming 2021-09-02 17:33:51 +08:00
parent 631115a317
commit 2f0a6b3af9
43 changed files with 855 additions and 335 deletions

View File

@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Sample.SqlServer.Domain.Entities;
using ShardingCore.Sharding.PaginationConfigurations;
namespace Sample.SqlServer.Shardings
{
public class SysUserModPaginationConfiguration : IPaginationConfiguration<SysUserMod>
{
public void Configure(PaginationBuilder<SysUserMod> builder)
{
builder.PaginationSequence(o => o.Id)
.UseQueryMatch(PaginationMatchEnum.Owner | PaginationMatchEnum.Named | PaginationMatchEnum.FirstMatch);
}
}
}

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq.Expressions;
using Sample.SqlServer.Domain.Entities;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.VirtualRoutes;
using ShardingCore.VirtualRoutes.Mods;
@ -25,5 +26,10 @@ namespace Sample.SqlServer.Shardings
public SysUserModVirtualTableRoute() : base(2,3)
{
}
public override IPaginationConfiguration<SysUserMod> CreatePaginationConfiguration()
{
return new SysUserModPaginationConfiguration();
}
}
}

View File

@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace ShardingCore.Core.ShardingPage.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 13:46:13
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IShardingPageAccessor
{
ShardingPageContext ShardingPageContext { get; set; }
}
}

View File

@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace ShardingCore.Core.ShardingPage.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 13:46:37
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IShardingPageManager
{
ShardingPageContext Current { get; }
/// <summary>
/// 创建分页scope
/// </summary>
/// <returns></returns>
ShardingPageScope CreateScope();
}
}

View File

@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using ShardingCore.Core.ShardingPage.Abstractions;
namespace ShardingCore.Core.ShardingPage
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 13:47:35
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingPageAccessor:IShardingPageAccessor
{
private static AsyncLocal<ShardingPageContext> _shardingPageContext = new AsyncLocal<ShardingPageContext>();
/// <inheritdoc />
public ShardingPageContext ShardingPageContext
{
get => _shardingPageContext.Value;
set => _shardingPageContext.Value = value;
}
}
}

View File

@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.Text;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Core.ShardingPage
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 13:46:55
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingPageContext
{
public ICollection<RouteQueryResult<long>> RouteQueryResults { get; }
private ShardingPageContext()
{
RouteQueryResults = new LinkedList<RouteQueryResult<long>>();
}
public static ShardingPageContext Create()
{
return new ShardingPageContext();
}
}
}

View File

@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Text;
using ShardingCore.Core.ShardingPage.Abstractions;
namespace ShardingCore.Core.ShardingPage
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 13:50:13
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingPageManager: IShardingPageManager
{
private readonly IShardingPageAccessor _shardingPageAccessor;
public ShardingPageManager(IShardingPageAccessor shardingPageAccessor)
{
_shardingPageAccessor = shardingPageAccessor;
}
public ShardingPageContext Current => _shardingPageAccessor.ShardingPageContext;
public ShardingPageScope CreateScope()
{
var shardingPageScope = new ShardingPageScope(_shardingPageAccessor);
_shardingPageAccessor.ShardingPageContext = ShardingPageContext.Create();
return shardingPageScope;
}
}
}

View File

@ -0,0 +1,40 @@
using System;
using System.Collections.Generic;
using System.Text;
using ShardingCore.Core.ShardingPage.Abstractions;
namespace ShardingCore.Core.ShardingPage
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 13:49:12
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingPageScope : IDisposable
{
/// <summary>
/// 分表配置访问器
/// </summary>
public IShardingPageAccessor ShardingPageAccessor { get; }
/// <summary>
/// 构造函数
/// </summary>
/// <param name="shardingPageAccessor"></param>
public ShardingPageScope(IShardingPageAccessor shardingPageAccessor)
{
ShardingPageAccessor = shardingPageAccessor;
}
/// <summary>
/// 回收
/// </summary>
public void Dispose()
{
ShardingPageAccessor.ShardingPageContext = null;
}
}
}

View File

@ -6,6 +6,7 @@ using ShardingCore.Core.QueryRouteManagers;
using ShardingCore.Core.QueryRouteManagers.Abstractions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.PaginationConfigurations;
namespace ShardingCore.Core.VirtualRoutes.TableRoutes.Abstractions
{
@ -17,6 +18,11 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.Abstractions
*/
public abstract class AbstractVirtualTableRoute<T, TKey> : IVirtualTableRoute<T> where T : class, IShardingTable
{
public virtual IPaginationConfiguration<T> CreatePaginationConfiguration()
{
return null;
}
public Type ShardingEntityType => typeof(T);
/// <summary>
/// 如何将分表字段转成对应的类型

View File

@ -2,6 +2,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Core.PhysicTables;
using ShardingCore.Sharding.PaginationConfigurations;
namespace ShardingCore.Core.VirtualRoutes.TableRoutes
{
@ -44,5 +45,10 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes
public interface IVirtualTableRoute<T> : IVirtualTableRoute where T : class, IShardingTable
{
/// <summary>
/// 返回null就是表示不开启分页配置
/// </summary>
/// <returns></returns>
IPaginationConfiguration<T> CreatePaginationConfiguration();
}
}

View File

@ -23,5 +23,22 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine
public ISet<IPhysicTable> ReplaceTables { get; }
protected bool Equals(RouteResult other)
{
return Equals(ReplaceTables, other.ReplaceTables);
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != this.GetType()) return false;
return Equals((RouteResult) obj);
}
public override int GetHashCode()
{
return (ReplaceTables != null ? ReplaceTables.GetHashCode() : 0);
}
}
}

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using ShardingCore.Core.PhysicTables;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Sharding.PaginationConfigurations;
namespace ShardingCore.Core.VirtualTables
{
@ -22,6 +23,14 @@ namespace ShardingCore.Core.VirtualTables
/// 分表配置
/// </summary>
ShardingTableConfig ShardingConfig { get; }
/// <summary>
/// 分页配置
/// </summary>
PaginationMetadata PaginationMetadata { get; }
/// <summary>
/// 是否启用分页配置
/// </summary>
bool EnablePagination => PaginationMetadata != null;
/// <summary>
/// 获取所有的物理表

View File

@ -8,6 +8,7 @@ using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Utils;
namespace ShardingCore.Core.VirtualTables
@ -28,12 +29,22 @@ namespace ShardingCore.Core.VirtualTables
public Type EntityType => typeof(T);
public ShardingTableConfig ShardingConfig { get; }
public PaginationMetadata PaginationMetadata { get; }
private readonly List<IPhysicTable> _physicTables = new List<IPhysicTable>();
public OneDbVirtualTable(IVirtualTableRoute<T> virtualTableRoute)
{
_virtualTableRoute = virtualTableRoute;
ShardingConfig = ShardingKeyUtil.Parse(EntityType);
var paginationConfiguration = virtualTableRoute.CreatePaginationConfiguration();
if (paginationConfiguration != null)
{
PaginationMetadata = new PaginationMetadata();
var paginationBuilder = new PaginationBuilder<T>(PaginationMetadata);
paginationConfiguration.Configure(paginationBuilder);
}
}
public List<IPhysicTable> GetAllPhysicTables()

View File

@ -16,6 +16,8 @@ using ShardingCore.Core.QueryRouteManagers;
using ShardingCore.Core.QueryRouteManagers.Abstractions;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Core.ShardingAccessors.Abstractions;
using ShardingCore.Core.ShardingPage;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding.ShardingQueryExecutors;
@ -87,9 +89,15 @@ namespace ShardingCore
services.AddSingleton<IShardingAccessor, ShardingAccessor>();
services.AddSingleton<IShardingScopeFactory, ShardingScopeFactory>();
services.AddSingleton<IRouteTailFactory, RouteTailFactory>();
services.AddSingleton<IShardingQueryExecutor, DefaultShardingQueryExecutor>();
//route manage
services.AddSingleton<IShardingRouteManager, ShardingRouteManager>();
services.AddSingleton<IShardingRouteAccessor, ShardingRouteAccessor>();
services.AddSingleton<IShardingQueryExecutor, DefaultShardingQueryExecutor>();
//sharding page
services.AddSingleton<IShardingPageManager, ShardingPageManager>();
services.AddSingleton<IShardingPageAccessor, ShardingPageAccessor>();
return services;
}

View File

@ -62,5 +62,28 @@ namespace ShardingCore.Extensions
}
}
/// <summary>
/// 是否有差异
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source"></param>
/// <returns></returns>
public static bool HasDifference<T>(this IEnumerable<T> source)
{
return source.Distinct().Count() > 1;
}
/// <summary>
/// 是否有差异
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TKey"></typeparam>
/// <param name="source"></param>
/// <param name="keySelector"></param>
/// <returns></returns>
public static bool HasDifference<T,TKey>(this IEnumerable<T> source, Func<T, TKey> keySelector)
{
return source.Select(keySelector).Distinct().Count() > 1;
}
}
}

View File

@ -16,6 +16,12 @@ namespace ShardingCore.Sharding.PaginationConfigurations
*/
public class PaginationBuilder<TEntity> where TEntity:class,IShardingTable
{
private readonly PaginationMetadata _metadata;
public PaginationBuilder(PaginationMetadata metadata)
{
_metadata = metadata;
}
/// <summary>
/// 分页顺序
/// </summary>
@ -23,7 +29,27 @@ namespace ShardingCore.Sharding.PaginationConfigurations
/// <typeparam name="TProperty"></typeparam>
public PaginationOrderPropertyBuilder PaginationSequence<TProperty>(Expression<Func<TEntity, TProperty>> orderPropertyExpression)
{
return new PaginationOrderPropertyBuilder(orderPropertyExpression);
return new PaginationOrderPropertyBuilder(orderPropertyExpression, _metadata);
}
/// <summary>
/// 配置当跳过多少条后开始启用只能分页
/// </summary>
/// <param name="skip"></param>
/// <returns></returns>
public PaginationBuilder<TEntity> ConfigUseShardingPageIfGeSkip(long skip)
{
_metadata.UseShardingPageIfGeSkipAvg = skip;
return this;
}
/// <summary>
/// 配置当分表数目小于多少后直接取到内存不在流式处理
/// </summary>
/// <param name="count"></param>
/// <returns></returns>
public PaginationBuilder<TEntity> ConfigTakeInMemoryCountIfLe(int count)
{
_metadata.TakeInMemoryCountIfLe = count;
return this;
}
}
}

View File

@ -0,0 +1,57 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Reflection;
using System.Text;
using Microsoft.EntityFrameworkCore.Infrastructure;
namespace ShardingCore.Sharding.PaginationConfigurations
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 7:45:55
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class PaginationConfig
{
public PaginationConfig(LambdaExpression orderPropertyExpression, PaginationMatchEnum paginationMatchEnum= PaginationMatchEnum.Owner, IComparer<string> tailComparer=null)
{
OrderPropertyExpression = orderPropertyExpression;
OrderPropertyInfo = orderPropertyExpression.GetPropertyAccess();
PropertyName = OrderPropertyInfo.Name;
PaginationMatchEnum = paginationMatchEnum;
TailComparer = tailComparer ?? Comparer<string>.Default;
}
public LambdaExpression OrderPropertyExpression { get; set; }
public IComparer<string> TailComparer { get; set; }
public PaginationMatchEnum PaginationMatchEnum { get; set; }
public PropertyInfo OrderPropertyInfo { get; set; }
/// <summary>
/// 如果查询没发现排序就将当前配置追加上去
/// </summary>
public bool AppendIfOrderNone { get; set; }
public string PropertyName { get;}
protected bool Equals(PaginationConfig other)
{
return PropertyName == other.PropertyName;
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != this.GetType()) return false;
return Equals((PaginationConfig) obj);
}
public override int GetHashCode()
{
return (PropertyName != null ? PropertyName.GetHashCode() : 0);
}
}
}

View File

@ -8,9 +8,20 @@ namespace ShardingCore.Sharding.PaginationConfigurations
* @Date: Wednesday, 01 September 2021 21:27:25
* @Email: 326308290@qq.com
*/
[Flags]
public enum PaginationMatchEnum
{
/// <summary>
/// 必须是当前对象的属性
/// </summary>
Owner = 1,
Named=1<<1
/// <summary>
/// 只要名称一样就可以了
/// </summary>
Named = 1 << 1,
/// <summary>
/// 仅第一个匹配就可以了
/// </summary>
FirstMatch = 1 << 2
}
}

View File

@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace ShardingCore.Sharding.PaginationConfigurations
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 7:45:16
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
/// <summary>
/// 分页配置元数据
/// </summary>
public class PaginationMetadata
{
public ISet<PaginationConfig> PaginationConfigs = new HashSet<PaginationConfig>();
/// <summary>
/// 配置生效当跳过多少条后 GREATER THAN OR EQUAL
/// </summary>
public long UseShardingPageIfGeSkipAvg { get; set; } = 3000L;
/// <summary>
/// 分表发现如果少于多少条后直接取到内存 LESS THAN OR EQUAL
/// </summary>
public int TakeInMemoryCountIfLe { get; set; } = 100;
}
}

View File

@ -1,8 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Reflection;
using Microsoft.EntityFrameworkCore.Infrastructure;
namespace ShardingCore.Sharding.PaginationConfigurations
{
@ -14,15 +12,12 @@ namespace ShardingCore.Sharding.PaginationConfigurations
*/
public class PaginationOrderPropertyBuilder
{
private readonly LambdaExpression _orderPropertyExpression;
private IComparer<string> _tailComparer;
private PaginationMatchEnum _paginationMatchEnum;
private PropertyInfo _orderPropertyInfo;
private readonly PaginationConfig _paginationConfig;
public PaginationOrderPropertyBuilder(LambdaExpression orderPropertyExpression)
public PaginationOrderPropertyBuilder(LambdaExpression orderPropertyExpression,PaginationMetadata metadata)
{
_orderPropertyExpression = orderPropertyExpression;
_orderPropertyInfo = orderPropertyExpression.GetPropertyAccess();
_paginationConfig = new PaginationConfig(orderPropertyExpression);
metadata.PaginationConfigs.Add(_paginationConfig);
}
/// <summary>
@ -32,7 +27,8 @@ namespace ShardingCore.Sharding.PaginationConfigurations
/// <returns></returns>
public PaginationOrderPropertyBuilder UseTailCompare(IComparer<string> tailComparer)
{
_tailComparer = tailComparer ?? throw new ArgumentException(nameof(tailComparer));
_paginationConfig.TailComparer= tailComparer ?? throw new ArgumentException(nameof(tailComparer));
return this;
}
/// <summary>
@ -42,7 +38,16 @@ namespace ShardingCore.Sharding.PaginationConfigurations
/// <returns></returns>
public PaginationOrderPropertyBuilder UseQueryMatch(PaginationMatchEnum paginationMatchEnum)
{
_paginationMatchEnum = paginationMatchEnum;
_paginationConfig.PaginationMatchEnum = paginationMatchEnum;
return this;
}
/// <summary>
/// 如果查询没发现排序就将当前配置追加上去
/// </summary>
/// <returns></returns>
public PaginationOrderPropertyBuilder UseAppendIfOrderNone()
{
_paginationConfig.AppendIfOrderNone = true;
return this;
}
}

View File

@ -104,5 +104,10 @@ namespace ShardingCore.Sharding
return this.SelectContext.SelectProperties.Any(o => o.IsAggregateMethod);
}
public IShardingDbContext GetShardingDbContext()
{
return _shardingDbContext;
}
}
}

View File

@ -9,6 +9,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
@ -32,120 +33,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
public override TEnsureResult MergeResult()
{
if (typeof(decimal) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<decimal>)queryable).Average());
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(decimal?) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<decimal?>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
return ConvertSum(average);
}
if (typeof(int) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<int>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(int?) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<int?>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
return ConvertSum(average);
}
if (typeof(long) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<long>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(long?) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<long?>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
return ConvertSum(average);
}
if (typeof(double) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<double>)queryable).Average()
);
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(double?) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<double?>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
return ConvertSum(average);
}
if (typeof(float) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<float>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
return ConvertSum(average);
}
if (typeof(float?) == typeof(TEnsureResult))
{
var result = base.Execute(
queryable => ((IQueryable<float?>)queryable).Average()
);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
return ConvertSum(average);
}
#if !EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}");
#endif
#if EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}");
#endif
return AsyncHelper.RunSync(() => MergeResultAsync());
}
public override async Task<TEnsureResult> MergeResultAsync(
@ -158,7 +46,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
var average = result.Sum(o=>o.QueryResult) / result.Count;
return ConvertSum(average);
}
@ -169,7 +57,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
var sum = result.Sum(o => o.QueryResult);
var average = sum.HasValue ? sum / result.Count : default;
return ConvertSum(average);
}
@ -180,7 +69,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
var average = result.Sum(o => o.QueryResult) / result.Count;
return ConvertSum(average);
}
@ -191,7 +80,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
var sum = result.Sum(o => o.QueryResult);
var average = sum.HasValue ? sum / result.Count : default;
return ConvertSum(average);
}
@ -202,7 +92,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
var average = result.Sum(o => o.QueryResult) / result.Count;
return ConvertSum(average);
}
@ -213,7 +103,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
var sum = result.Sum(o => o.QueryResult);
var average = sum.HasValue ? sum / result.Count : default;
return ConvertSum(average);
}
@ -222,7 +113,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var result = await base.ExecuteAsync(
queryable => ((IQueryable<double>)queryable).AverageAsync(cancellationToken),
cancellationToken);
var average = result.Sum() / result.Count;
var average = result.Sum(o => o.QueryResult) / result.Count;
return ConvertSum(average);
}
@ -233,7 +124,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
var sum = result.Sum(o => o.QueryResult);
var average = sum.HasValue ? sum / result.Count : default;
return ConvertSum(average);
}
@ -244,7 +136,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum() / result.Count;
var average = result.Sum(o => o.QueryResult) / result.Count;
return ConvertSum(average);
}
@ -255,18 +147,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
cancellationToken);
if (result.IsEmpty())
return default;
var average = result.Sum().HasValue ? result.Sum() / result.Count : default;
var sum = result.Sum(o => o.QueryResult);
var average = sum.HasValue ? sum / result.Count : default;
return ConvertSum(average);
}
#if !EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}");
#endif
#if EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}");
#endif
$"not support {GetMethodCallExpression().ShardingPrint()} result {typeof(TEnsureResult)}");
}
private TEnsureResult ConvertSum<TNumber>(TNumber number)

View File

@ -6,6 +6,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
@ -27,14 +28,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).Max());
return result.Max();
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).MaxAsync(cancellationToken), cancellationToken);
return result.Max();
return result.Max(o=>o.QueryResult);
}
}
}

View File

@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
@ -28,14 +29,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).Min());
return result.Min();
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).MinAsync(cancellationToken), cancellationToken);
return result.Min();
return result.Min(o=>o.QueryResult);
}
}
}

View File

@ -9,6 +9,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
@ -30,94 +31,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
public override TEnsureResult MergeResult()
{
if (typeof(decimal) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<decimal>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(decimal?) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<decimal?>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(int) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<int>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(int?) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<int?>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(long) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<long>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(long?) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<long?>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(double) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<double>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(double?) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<double?>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(float) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<float>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
if (typeof(float?) == typeof(TEnsureResult))
{
var result = base.Execute(queryable => ((IQueryable<float?>)queryable).Sum());
if (result.IsEmpty())
return default;
var sum = result.Sum();
return ConvertSum(sum);
}
#if !EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}");
#endif
#if EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}");
#endif
return AsyncHelper.RunSync(() => MergeResultAsync());
}
public override async Task<TEnsureResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
@ -127,7 +42,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var result = await base.ExecuteAsync(queryable => ((IQueryable<decimal>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
var sum = result.Sum(o=>o.QueryResult);
return ConvertSum(sum);
}
if (typeof(decimal?) == typeof(TEnsureResult))
@ -135,7 +50,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var result = await base.ExecuteAsync(queryable => ((IQueryable<decimal?>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
var sum = result.Sum(o=>o.QueryResult);
return ConvertSum(sum);
}
if (typeof(int) == typeof(TEnsureResult))
@ -143,7 +58,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var result = await base.ExecuteAsync(queryable => ((IQueryable<int>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
var sum = result.Sum(o=>o.QueryResult);
return ConvertSum(sum);
}
if (typeof(int?) == typeof(TEnsureResult))
@ -151,7 +66,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var result = await base.ExecuteAsync(queryable => ((IQueryable<int?>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
var sum = result.Sum(o => o.QueryResult);
return ConvertSum(sum);
}
if (typeof(long) == typeof(TEnsureResult))
@ -159,7 +74,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var result = await base.ExecuteAsync(queryable => ((IQueryable<long>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
var sum = result.Sum(o => o.QueryResult);
return ConvertSum(sum);
}
if (typeof(long?) == typeof(TEnsureResult))
@ -167,7 +82,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var result = await base.ExecuteAsync(queryable => ((IQueryable<long?>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
var sum = result.Sum(o => o.QueryResult);
return ConvertSum(sum);
}
if (typeof(double) == typeof(TEnsureResult))
@ -175,7 +90,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var result = await base.ExecuteAsync(queryable => ((IQueryable<double>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
var sum = result.Sum(o => o.QueryResult);
return ConvertSum(sum);
}
if (typeof(double?) == typeof(TEnsureResult))
@ -183,7 +98,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var result = await base.ExecuteAsync(queryable => ((IQueryable<double?>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
var sum = result.Sum(o => o.QueryResult);
return ConvertSum(sum);
}
if (typeof(float) == typeof(TEnsureResult))
@ -191,7 +106,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var result = await base.ExecuteAsync(queryable => ((IQueryable<float>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
var sum = result.Sum(o => o.QueryResult);
return ConvertSum(sum);
}
if (typeof(float?) == typeof(TEnsureResult))
@ -199,18 +114,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var result = await base.ExecuteAsync(queryable => ((IQueryable<float?>)queryable).SumAsync(cancellationToken), cancellationToken);
if (result.IsEmpty())
return default;
var sum = result.Sum();
var sum = result.Sum(o => o.QueryResult);
return ConvertSum(sum);
}
#if !EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}");
#endif
#if EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}");
#endif
$"not support {GetMethodCallExpression().ShardingPrint()} result {typeof(TEnsureResult)}");
}
private TEnsureResult ConvertSum<TNumber>(TNumber number)
{

View File

@ -38,7 +38,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).AnyAsync(cancellationToken), cancellationToken);
return result.All(o => o);
return result.All(o => o.QueryResult);
}
}
}

View File

@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
@ -29,16 +30,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override bool MergeResult()
{
var result = base.Execute( queryable => ((IQueryable<TEntity>)queryable).Any());
return result.Any(o => o);
return AsyncHelper.RunSync(() => MergeResultAsync());
}
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).AnyAsync(cancellationToken), cancellationToken);
return result.Any(o => o);
return result.Any(o => o.QueryResult);
}
}

View File

@ -75,7 +75,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
private IAsyncEnumerator<T> GetShardingEnumerator()
{
var tableResult = _mergeContext.GetRouteResults();
var tableResult = _mergeContext.RouteResults;
var routeCount = tableResult.Count();
var enumeratorTasks = tableResult.Select(routeResult =>
{
@ -114,7 +114,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public IEnumerator<T> GetEnumerator()
{
var tableResult = _mergeContext.GetRouteResults();
var tableResult = _mergeContext.RouteResults;
var routeCount = tableResult.Count();
var enumeratorTasks = tableResult.Select(routeResult =>
{

View File

@ -4,6 +4,7 @@ using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
@ -25,16 +26,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override bool MergeResult()
{
var result = base.Execute( queryable => ((IQueryable<TEntity>)queryable).Contains(GetConstantItem()));
return result.Any(o => o);
return AsyncHelper.RunSync(() => MergeResultAsync());
}
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).ContainsAsync(GetConstantItem(), cancellationToken), cancellationToken);
return result.Any(o => o);
return result.Any(o => o.QueryResult);
}
}

View File

@ -4,6 +4,7 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
@ -18,8 +19,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines
*/
public class CountAsyncInMemoryMergeEngine<TEntity> : AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine<TEntity,int>
{
private readonly IShardingPageManager _shardingPageManager;
public CountAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
}
public override int MergeResult()
@ -31,6 +34,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).CountAsync(cancellationToken), cancellationToken);
if (_shardingPageManager.Current != null)
{
foreach (var routeQueryResult in result)
{
_shardingPageManager.Current.RouteQueryResults.Add(new RouteQueryResult<long>(routeQueryResult.RouteResult, routeQueryResult.QueryResult));
}
}
return result.Sum(o=>o.QueryResult);
}

View File

@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 15:38:05
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractEnumeratorAsyncStreamMergeEngine<TEntity>: AbstractEnumeratorStreamMergeEngine<TEntity>
{
public AbstractEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
}
public override IAsyncEnumerator<TEntity> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
var dbStreamMergeAsyncEnumerators = GetDbStreamMergeAsyncEnumerators();
if (dbStreamMergeAsyncEnumerators.IsEmpty())
throw new ShardingCoreException("GetDbStreamMergeAsyncEnumerators empty");
return GetStreamMergeAsyncEnumerator(dbStreamMergeAsyncEnumerators);
}
public abstract IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators();
public abstract IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators);
public async Task<IAsyncEnumerator<TEntity>> DoGetAsyncEnumerator(IQueryable<TEntity> newQueryable)
{
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
}
public virtual IQueryable<TEntity> CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
var useOriginal = routeCount > 1;
DbContextQueryStore.TryAdd(routeResult,shardingDbContext);
var newQueryable = (IQueryable<TEntity>)(useOriginal ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable())
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
}
public override IEnumerator<TEntity> GetEnumerator()
{
throw new NotImplementedException();
}
}
}

View File

@ -0,0 +1,52 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 15:35:39
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractEnumeratorStreamMergeEngine<TEntity>:IEnumeratorStreamMergeEngine<TEntity>
{
public StreamMergeContext<TEntity> StreamMergeContext { get; }
public ConcurrentDictionary<RouteResult,DbContext> DbContextQueryStore { get; }
public AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
{
StreamMergeContext = streamMergeContext;
DbContextQueryStore = new ConcurrentDictionary<RouteResult, DbContext>();
}
public abstract IAsyncEnumerator<TEntity> GetAsyncEnumerator(
CancellationToken cancellationToken = new CancellationToken());
public abstract IEnumerator<TEntity> GetEnumerator();
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public void Dispose()
{
if (DbContextQueryStore.IsNotEmpty())
{
DbContextQueryStore.Values.ForEach(dbContext =>
{
dbContext.Dispose();
});
}
}
}
}

View File

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 15:38:13
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class AbstractEnumeratorSyncStreamMergeEngine<TEntity> : AbstractEnumeratorStreamMergeEngine<TEntity>
{
public AbstractEnumeratorSyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
}
public override IAsyncEnumerator<TEntity> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
throw new NotImplementedException();
}
public override IEnumerator<TEntity> GetEnumerator()
{
throw new NotImplementedException();
}
}
}

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 15:15:34
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IEnumeratorStreamMergeEngine<TEntity> : IAsyncEnumerable<TEntity>, IEnumerable<TEntity>, IDisposable
{
}
}

View File

@ -0,0 +1,74 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 16:16:12
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class NormalEnumeratorAsyncStreamMergeEngine<TEntity>:AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
{
private readonly bool _multiRouteQuery;
public NormalEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
_multiRouteQuery = streamMergeContext.RouteResults.Count() > 1;
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
{
var tableResult = StreamMergeContext.RouteResults;
var routeCount = tableResult.Count();
var enumeratorTasks = tableResult.Select(routeResult =>
{
return Task.Run(async () =>
{
try
{
var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount);
var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
return streamEnumerators;
}
public override IQueryable<TEntity> CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
DbContextQueryStore.TryAdd(routeResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>)(_multiRouteQuery ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable())
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (_multiRouteQuery && StreamMergeContext.HasSkipTake())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,100 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 16:29:06
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class SequenceEnumeratorAsyncStreamMergeEngine<TEntity> : AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
{
private IShardingPageManager _shardingPageManager;
private IVirtualTableManager _virtualTableManager;
public SequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager>();
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
{
var routeQueryResults = _shardingPageManager.Current.RouteQueryResults.ToList();
if (routeQueryResults.Any(o => o.RouteResult.ReplaceTables.Count(p=>p.IsShardingTable())!=1)
|| routeQueryResults.HasDifference(o=>o.RouteResult.ReplaceTables.First().EntityType))
throw new InvalidOperationException($"error sharding page:[{StreamMergeContext.GetOriginalQueryable().Expression.ShardingPrint()}]");
var shardingEntityType = routeQueryResults[0].RouteResult.ReplaceTables.FirstOrDefault(o=>o.IsShardingTable()).EntityType;
var virtualTable = _virtualTableManager.GetVirtualTable(StreamMergeContext.GetShardingDbContext().ShardingDbContextType, shardingEntityType);
if (!virtualTable.EnablePagination)
{
throw new ShardingCoreException("not support Sequence enumerator");
}
if (base.StreamMergeContext.Orders.IsEmpty())
{
var append = virtualTable.PaginationMetadata.PaginationConfigs.FirstOrDefault(o=>o.AppendIfOrderNone);
if (append != null)
{
StreamMergeContext.GetOriginalQueryable().OrderBy("")
}
}
var tableResult = StreamMergeContext.RouteResults;
var routeCount = tableResult.Count();
var enumeratorTasks = tableResult.Select(routeResult =>
{
return Task.Run(async () =>
{
try
{
var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount);
var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
return streamEnumerators;
}
public override IQueryable<TEntity> CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
DbContextQueryStore.TryAdd(routeResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>)(_multiRouteQuery ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable())
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (_multiRouteQuery && StreamMergeContext.HasSkipTake())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
}
}
}

View File

@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
@ -28,20 +29,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override TResult MergeResult<TResult>()
{
var result = base.Execute(queryable => ((IQueryable<TResult>)queryable).First());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).First();
return q.First();
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).FirstAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())

View File

@ -9,6 +9,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
@ -31,20 +32,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).FirstOrDefault());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).FirstOrDefault();
return q.FirstOrDefault();
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())

View File

@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
@ -28,21 +29,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).Last());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).Last();
return q.Last();
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).LastAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())

View File

@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
@ -28,20 +29,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).LastOrDefault());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).LastOrDefault();
return q.LastOrDefault();
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).LastOrDefaultAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())

View File

@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Exceptions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
@ -25,8 +26,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines
*/
public class LongCountAsyncInMemoryMergeEngine<TEntity> : AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine<TEntity,long>
{
private readonly IShardingPageManager _shardingPageManager;
public LongCountAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
_shardingPageManager= ShardingContainer.GetService<IShardingPageManager>();
}
public override long MergeResult()
@ -39,6 +42,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).LongCountAsync(cancellationToken), cancellationToken);
if (_shardingPageManager.Current != null)
{
foreach (var routeQueryResult in result)
{
_shardingPageManager.Current.RouteQueryResults.Add(routeQueryResult);
}
}
return result.Sum(o=>o.QueryResult);
}
}

View File

@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
@ -28,20 +29,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override TResult MergeResult<TResult>()
{
var result = base.Execute( queryable => ((IQueryable<TResult>)queryable).Single());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).Single();
return q.Single();
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).SingleAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())

View File

@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
@ -28,20 +29,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override TResult MergeResult<TResult>()
{
var result = base.Execute(queryable => ((IQueryable<TResult>)queryable).SingleOrDefault());
var q = result.Where(o => o != null).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).SingleOrDefault();
return q.SingleOrDefault();
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).SingleOrDefaultAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())