优化数据合并引擎

This commit is contained in:
xuejmnet 2021-01-28 17:22:06 +08:00
parent 08bb2549b0
commit 9ff7b12d33
27 changed files with 678 additions and 303 deletions

View File

@ -195,7 +195,7 @@ route
## Api
方法 | Method | SqlServer Unit Test | MySql Unit Test
方法 | Method | [SqlServer Unit Test](https://github.com/xuejmnet/sharding-core/blob/main/test/ShardingCore.Test50/ShardingTest.cs) | [MySql Unit Test](https://github.com/xuejmnet/sharding-core/blob/main/test/ShardingCore.Test50.MySql/ShardingTest.cs)
--- |--- |--- |---
获取集合 |ToShardingListAsync |8 |8
第一条 |ShardingFirstOrDefaultAsync |5 |5

View File

@ -36,6 +36,7 @@ namespace ShardingCore.MySql
var options = new MySqlOptions();
configure(options);
services.AddSingleton(options);
services.AddShardingCore();
services.AddScoped<IVirtualDbContext, VirtualDbContext>();
services.AddScoped<IDbContextOptionsProvider, SqlServerDbContextOptionsProvider>();

View File

@ -3,7 +3,7 @@ using System.Collections.Generic;
namespace ShardingCore.Core.Internal.PriorityQueues {
/// <summary>
/// 泛型优先队列 https://www.cnblogs.com/skyivben/archive/2009/04/18/1438731.html
/// 泛型优先队列 https://www.cnblogs.com/skyivben/archive/2009/04/18/1438731.html 优化T类型不需要实现IComparable
/// </summary>
/// <typeparam name="T">实现IComparable&lt;T&gt;的类型</typeparam>
internal class PriorityQueue<T> {

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace ShardingCore.Core.Internal.RoutingRuleEngines
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 10:25:22
* @Email: 326308290@qq.com
*/
public interface IRouteRuleEngine
{
IEnumerable<RouteResult> Route<T>(RouteRuleContext<T> routeRuleContext);
}
}

View File

@ -0,0 +1,15 @@
using System;
namespace ShardingCore.Core.Internal.RoutingRuleEngines
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 13:30:28
* @Email: 326308290@qq.com
*/
public interface IRoutingRuleEngineFactory
{
IRouteRuleEngine CreateEngine();
}
}

View File

@ -0,0 +1,92 @@
using System;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Core.PhysicTables;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Extensions;
#if !EFCORE5
using ShardingCore.Extensions;
#endif
namespace ShardingCore.Core.Internal.RoutingRuleEngines
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 10:51:59
* @Email: 326308290@qq.com
*/
public class QueryRouteRuleEngines:IRouteRuleEngine
{
private readonly IVirtualTableManager _virtualTableManager;
public QueryRouteRuleEngines(IVirtualTableManager virtualTableManager)
{
_virtualTableManager = virtualTableManager;
}
public IEnumerable<RouteResult> Route<T>(RouteRuleContext<T> routeRuleContext)
{
Dictionary<IVirtualTable, ISet<IPhysicTable>> routeMaps = new Dictionary<IVirtualTable, ISet<IPhysicTable>>();
//先添加手动路由到当前上下文,之后将不再手动路由里面的自动路由添加到当前上下文
foreach (var kv in routeRuleContext.ManualTails)
{
var virtualTable = kv.Key;
var addTails = kv.Value;
var physicTables = virtualTable.GetAllPhysicTables().Where(o=>addTails.Contains(o.Tail));
if (!routeMaps.ContainsKey(virtualTable))
{
routeMaps.Add(virtualTable,physicTables.ToHashSet());
} else
{
foreach (var physicTable in physicTables)
{
routeMaps[virtualTable].Add(physicTable);
}
}
}
foreach (var kv in routeRuleContext.ManualPredicate)
{
var virtualTable = kv.Key;
var predicate = kv.Value;
var physicTables = virtualTable.RouteTo(new RouteConfig(null, null, null, predicate));
if (!routeMaps.ContainsKey(virtualTable))
{
routeMaps.Add(virtualTable, physicTables.ToHashSet());
}
else
{
foreach (var physicTable in physicTables)
{
routeMaps[virtualTable].Add(physicTable);
}
}
}
if (routeRuleContext.AutoParseRoute)
{
var shardingEntities = routeRuleContext.Queryable.ParseQueryableRoute();
foreach (var shardingEntity in shardingEntities)
{
var virtualTable = _virtualTableManager.GetVirtualTable(shardingEntity);
var physicTables = virtualTable.RouteTo(new RouteConfig(routeRuleContext.Queryable));
if (!routeMaps.ContainsKey(virtualTable))
{
routeMaps.Add(virtualTable, physicTables.ToHashSet());
}
else
{
foreach (var physicTable in physicTables)
{
routeMaps[virtualTable].Add(physicTable);
}
}
}
}
return routeMaps.Select(o => o.Value).Cartesian().Select(o=>new RouteResult(o));
}
}
}

View File

@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Core.PhysicTables;
#if !EFCORE5
using ShardingCore.Extensions;
#endif
namespace ShardingCore.Core.Internal.RoutingRuleEngines
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 10:18:09
* @Email: 326308290@qq.com
*/
public class RouteResult
{
public RouteResult(IEnumerable<IPhysicTable> replaceTables)
{
ReplaceTables = replaceTables.ToHashSet();
}
public ISet<IPhysicTable> ReplaceTables { get; }
}
}

View File

@ -0,0 +1,36 @@
using System;
namespace ShardingCore.Core.Internal.RoutingRuleEngines
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 10:53:55
* @Email: 326308290@qq.com
*/
public class RouteRuleConfig
{
private bool _autoParseRoute = true;
public bool GetAutoParseRoute()
{
return _autoParseRoute;
}
/// <summary>
/// 启用自动路由
/// </summary>
public void EnableAutoRouteParse()
{
_autoParseRoute = true;
}
/// <summary>
/// 禁用自动路由
/// </summary>
public void DisableAutoRouteParse()
{
_autoParseRoute = false;
}
}
}

View File

@ -0,0 +1,99 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using ShardingCore.Core.VirtualTables;
namespace ShardingCore.Core.Internal.RoutingRuleEngines
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 10:54:52
* @Email: 326308290@qq.com
*/
public class RouteRuleContext<T>
{
private readonly IVirtualTableManager _virtualTableManager;
public RouteRuleContext(IQueryable<T> queryable, IVirtualTableManager virtualTableManager)
{
_virtualTableManager = virtualTableManager;
Queryable = queryable;
}
public IQueryable<T> Queryable { get; }
/// <summary>
///
/// </summary>
public readonly Dictionary<IVirtualTable, Expression> ManualPredicate = new Dictionary<IVirtualTable, Expression>();
public readonly Dictionary<IVirtualTable, ISet<string>> ManualTails = new Dictionary<IVirtualTable, ISet<string>>();
public bool AutoParseRoute = true;
/// <summary>
/// 启用自动路由
/// </summary>
public void EnableAutoRouteParse()
{
AutoParseRoute = true;
}
/// <summary>
/// 禁用自动路由
/// </summary>
public void DisableAutoRouteParse()
{
AutoParseRoute = false;
}
/// <summary>
/// 添加手动路由
/// </summary>
/// <param name="predicate"></param>
/// <typeparam name="TShardingEntity"></typeparam>
public void AddRoute<TShardingEntity>(Expression<Func<TShardingEntity, bool>> predicate) where TShardingEntity : class, IShardingEntity
{
var virtualTable = _virtualTableManager.GetVirtualTable<TShardingEntity>();
if (!ManualPredicate.ContainsKey(virtualTable))
{
ShardingCore.Extensions.ExpressionExtension.And((Expression<Func<TShardingEntity, bool>>) ManualPredicate[virtualTable], predicate);
}
else
{
ManualPredicate.Add(virtualTable, predicate);
}
}
public void AddRoute(Type shardingEntityType,string tail)
{
var virtualTable = _virtualTableManager.GetVirtualTable(shardingEntityType);
AddRoute(virtualTable, tail);
}
public void AddRouteWithGengric<TShardingEntity>(string tail) where TShardingEntity : class, IShardingEntity
{
AddRoute(typeof(TShardingEntity), tail);
}
public void AddRoute(IVirtualTable virtualTable, string tail)
{
if (ManualTails.ContainsKey(virtualTable))
{
var tails = ManualTails[virtualTable];
if (!tails.Contains(tail))
{
tails.Add(tail);
}
}
else
{
ManualTails.Add(virtualTable, new HashSet<string>()
{
tail
});
}
}
}
}

View File

@ -0,0 +1,26 @@
using System;
using ShardingCore.Core.VirtualTables;
namespace ShardingCore.Core.Internal.RoutingRuleEngines
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 13:31:06
* @Email: 326308290@qq.com
*/
public class RoutingRuleEngineFactory : IRoutingRuleEngineFactory
{
private readonly IRouteRuleEngine _routeRuleEngine;
public RoutingRuleEngineFactory(IRouteRuleEngine routeRuleEngine)
{
_routeRuleEngine = routeRuleEngine;
}
public IRouteRuleEngine CreateEngine()
{
return _routeRuleEngine;
}
}
}

View File

@ -0,0 +1,62 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge.ListMerge;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Extensions;
namespace ShardingCore.Core.Internal.StreamMerge.GenericMerges
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 17:04:29
* @Email: 326308290@qq.com
*/
internal class GenericMergeEngine<T>
{
private readonly StreamMergeContext<T> _mergeContext;
public GenericMergeEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
}
private async Task<TResult> EFCoreExecute<TResult>(IQueryable<T> newQueryable,RouteResult routeResult,Func<IQueryable, Task<TResult>> efQuery)
{
using (var scope = _mergeContext.CreateScope())
{
scope.ShardingAccessor.ShardingContext = ShardingContext.Create(routeResult);
return await efQuery(newQueryable);
}
}
public async Task<List<TResult>> Execute<TResult>(Func<IQueryable, Task<TResult>> efQuery)
{
//从各个分表获取数据
List<DbContext> parallelDbContexts = new List<DbContext>(_mergeContext.RouteResults.Count());
try
{
var enumeratorTasks = _mergeContext.RouteResults.Select(routeResult =>
{
return Task.Run(async () =>
{
var shardingDbContext = _mergeContext.CreateDbContext();
parallelDbContexts.Add(shardingDbContext);
var newQueryable = (IQueryable<T>) _mergeContext.Source.ReplaceDbContextQueryable(shardingDbContext);
return await EFCoreExecute(newQueryable,routeResult,efQuery);
});
}).ToArray();
return (await Task.WhenAll(enumeratorTasks)).ToList();
}
finally
{
parallelDbContexts.ForEach(o => o.Dispose());
}
}
}
}

View File

@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Core.Internal.RoutingRuleEngines;
namespace ShardingCore.Core.Internal.StreamMerge
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 16:51:41
* @Email: 326308290@qq.com
*/
internal interface IStreamMergeContextFactory
{
StreamMergeContext<T> Create<T>(IQueryable<T> queryable, IEnumerable<RouteResult> routeResults);
}
}

View File

@ -15,13 +15,13 @@ namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
#if !EFCORE2
internal class OrderAsyncEnumerator<T> : IAsyncEnumerator<T>
{
private readonly StreamMergeContext _mergeContext;
private readonly StreamMergeContext<T> _mergeContext;
private readonly List<IAsyncEnumerator<T>> _sources;
private readonly PriorityQueue<OrderMergeItem<T>> _queue;
private bool skipFirst;
private IAsyncEnumerator<T> _currentEnumerator;
public OrderAsyncEnumerator(StreamMergeContext mergeContext,List<IAsyncEnumerator<T>> sources)
public OrderAsyncEnumerator(StreamMergeContext<T> mergeContext,List<IAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_sources = sources;
@ -81,13 +81,13 @@ namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
internal class OrderAsyncEnumerator<T> : IAsyncEnumerator<T>
{
private readonly StreamMergeContext _mergeContext;
private readonly StreamMergeContext<T> _mergeContext;
private readonly List<IAsyncEnumerator<T>> _sources;
private readonly PriorityQueue<OrderMergeItem<T>> _queue;
private bool skipFirst;
private IAsyncEnumerator<T> _currentEnumerator;
public OrderAsyncEnumerator(StreamMergeContext mergeContext, List<IAsyncEnumerator<T>> sources)
public OrderAsyncEnumerator(StreamMergeContext<T> mergeContext, List<IAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_sources = sources;

View File

@ -19,12 +19,12 @@ namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
/// <summary>
/// 合并数据上下文
/// </summary>
private readonly StreamMergeContext _mergeContext;
private readonly StreamMergeContext<T> _mergeContext;
private readonly IAsyncEnumerator<T> _source;
private List<IComparable> _orderValues;
public OrderMergeItem(StreamMergeContext mergeContext, IAsyncEnumerator<T> source)
public OrderMergeItem(StreamMergeContext<T> mergeContext, IAsyncEnumerator<T> source)
{
_mergeContext = mergeContext;
_source = source;
@ -87,12 +87,12 @@ namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
/// <summary>
/// 合并数据上下文
/// </summary>
private readonly StreamMergeContext _mergeContext;
private readonly StreamMergeContext<T> _mergeContext;
private readonly IAsyncEnumerator<T> _source;
private List<IComparable> _orderValues;
public OrderMergeItem(StreamMergeContext mergeContext, IAsyncEnumerator<T> source)
public OrderMergeItem(StreamMergeContext<T> mergeContext, IAsyncEnumerator<T> source)
{
_mergeContext = mergeContext;
_source = source;

View File

@ -1,5 +1,8 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
{
@ -12,10 +15,10 @@ namespace ShardingCore.Core.Internal.StreamMerge.ListMerge
internal class StreamMergeListEngine<T>
{
private const int defaultCapacity = 0x10;//默认容量为16
private readonly StreamMergeContext _mergeContext;
private readonly StreamMergeContext<T> _mergeContext;
private readonly List<IAsyncEnumerator<T>> _sources;
public StreamMergeListEngine(StreamMergeContext mergeContext,List<IAsyncEnumerator<T>> sources)
public StreamMergeListEngine(StreamMergeContext<T> mergeContext,List<IAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_sources = sources;

View File

@ -0,0 +1,88 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge.ListMerge;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Extensions;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
namespace ShardingCore.Core.Internal.StreamMerge.ListSourceMerges
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 14:45:19
* @Email: 326308290@qq.com
*/
internal class StreamMergeListSourceEngine<T>
{
private readonly StreamMergeContext<T> _mergeContext;
public StreamMergeListSourceEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
}
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable,RouteResult routeResult)
{
using (var scope = _mergeContext.CreateScope())
{
scope.ShardingAccessor.ShardingContext = ShardingContext.Create(routeResult);
#if !EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
#endif
#if EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetEnumerator();
await enumator.MoveNext();
#endif
return enumator;
}
}
public async Task<List<T>> Execute()
{
//去除分页,获取前Take+Skip数量
var noPageSource = _mergeContext.Source.RemoveTake().RemoveSkip();
if (_mergeContext.Take.HasValue)
noPageSource = noPageSource.Take(_mergeContext.Take.Value + _mergeContext.Skip.GetValueOrDefault());
//从各个分表获取数据
List<DbContext> parallelDbContexts = new List<DbContext>(_mergeContext.RouteResults.Count());
try
{
var enumeratorTasks = _mergeContext.RouteResults.Select(routeResult =>
{
return Task.Run(async () =>
{
var shardingDbContext = _mergeContext.CreateDbContext();
parallelDbContexts.Add(shardingDbContext);
var newQueryable = (IQueryable<T>) noPageSource.ReplaceDbContextQueryable(shardingDbContext);
#if !EFCORE2
return await GetAsyncEnumerator(newQueryable,routeResult);
#endif
#if EFCORE2
return await GetAsyncEnumerator(newQueryable,routeResult);
#endif
});
}).ToArray();
var enumerators = (await Task.WhenAll(enumeratorTasks)).ToList();
var engine = new StreamMergeListEngine<T>(_mergeContext, enumerators);
return await engine.Execute();
}
finally
{
parallelDbContexts.ForEach(o => o.Dispose());
}
}
}
}

View File

@ -1,5 +1,11 @@
using System.Collections.Generic;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.DbContexts;
using ShardingCore.Extensions;
namespace ShardingCore.Core.Internal.StreamMerge
{
@ -9,17 +15,38 @@ namespace ShardingCore.Core.Internal.StreamMerge
* @Date: Monday, 25 January 2021 11:38:27
* @Email: 326308290@qq.com
*/
internal class StreamMergeContext
internal class StreamMergeContext<T>
{
public StreamMergeContext(int? skip, int? take, IEnumerable<PropertyOrder> orders = null)
private readonly IShardingParallelDbContextFactory _shardingParallelDbContextFactory;
private readonly IShardingScopeFactory _shardingScopeFactory;
public IQueryable<T> Source { get; }
public IEnumerable<RouteResult> RouteResults { get; }
public int? Skip { get; private set; }
public int? Take { get; private set; }
public IEnumerable<PropertyOrder> Orders { get; private set; }
public StreamMergeContext(IQueryable<T> source,IEnumerable<RouteResult> routeResults,
IShardingParallelDbContextFactory shardingParallelDbContextFactory,IShardingScopeFactory shardingScopeFactory)
{
Skip = skip;
Take = take;
Orders = orders??new List<PropertyOrder>();
_shardingParallelDbContextFactory = shardingParallelDbContextFactory;
_shardingScopeFactory = shardingScopeFactory;
Source = source;
RouteResults = routeResults;
var extraEntry = source.GetExtraEntry();
Skip = extraEntry.Skip;
Take = extraEntry.Take;
Orders = extraEntry.Orders ?? Enumerable.Empty<PropertyOrder>();
}
public DbContext CreateDbContext()
{
return _shardingParallelDbContextFactory.Create(string.Empty);
}
public ShardingScope CreateScope()
{
return _shardingScopeFactory.CreateScope();
}
public int? Skip { get; set; }
public int? Take { get; set; }
public IEnumerable<PropertyOrder> Orders { get; set; }
}
}

View File

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.DbContexts;
namespace ShardingCore.Core.Internal.StreamMerge
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 16:52:43
* @Email: 326308290@qq.com
*/
internal class StreamMergeContextFactory:IStreamMergeContextFactory
{
private readonly IShardingParallelDbContextFactory _shardingParallelDbContextFactory;
private readonly IShardingScopeFactory _shardingScopeFactory;
public StreamMergeContextFactory(IShardingParallelDbContextFactory shardingParallelDbContextFactory,IShardingScopeFactory shardingScopeFactory)
{
_shardingParallelDbContextFactory = shardingParallelDbContextFactory;
_shardingScopeFactory = shardingScopeFactory;
}
public StreamMergeContext<T> Create<T>(IQueryable<T> queryable, IEnumerable<RouteResult> routeResults)
{
return new StreamMergeContext<T>(queryable, routeResults, _shardingParallelDbContextFactory, _shardingScopeFactory);
}
}
}

View File

@ -1,4 +1,5 @@
using System;
using ShardingCore.Core.VirtualTables;
namespace ShardingCore.Core.PhysicTables
{
@ -13,17 +14,18 @@ namespace ShardingCore.Core.PhysicTables
/// </summary>
public class DefaultPhysicTable:IPhysicTable
{
public DefaultPhysicTable(string originalName, string tailPrefix, string tail, Type virtualType)
public DefaultPhysicTable(string originalName, IVirtualTable virtualTable, string tail)
{
VirtualTable = virtualTable;
OriginalName = originalName;
TailPrefix = tailPrefix;
Tail = tail;
VirtualType = virtualType;
}
public string FullName => $"{OriginalName}{TailPrefix}{Tail}";
public string OriginalName { get; }
public string TailPrefix { get; }
public string TailPrefix =>VirtualTable.ShardingConfig.TailPrefix;
public string Tail { get; }
public Type VirtualType { get; }
public Type VirtualType => VirtualTable.EntityType;
public IVirtualTable VirtualTable { get; }
}
}

View File

@ -1,4 +1,5 @@
using System;
using ShardingCore.Core.VirtualTables;
namespace ShardingCore.Core.PhysicTables
{
@ -30,6 +31,7 @@ namespace ShardingCore.Core.PhysicTables
/// 映射类类型
/// </summary>
Type VirtualType { get; }
IVirtualTable VirtualTable { get; }
}
}

View File

@ -1,4 +1,5 @@
using System.Collections.Generic;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Extensions;
@ -12,8 +13,12 @@ namespace ShardingCore.Core.ShardingAccessors
*/
public class ShardingContext
{
private ShardingContext()
private ShardingContext(RouteResult routeResult)
{
foreach (var physicTable in routeResult.ReplaceTables)
{
_shardingTables.Add(physicTable.VirtualTable, new List<string>(1){physicTable.Tail});
}
}
/// <summary>
@ -36,9 +41,9 @@ namespace ShardingCore.Core.ShardingAccessors
/// 创建一个分表上下文
/// </summary>
/// <returns></returns>
public static ShardingContext Create()
public static ShardingContext Create(RouteResult routeResult)
{
return new ShardingContext();
return new ShardingContext(routeResult);
}
/// <summary>

View File

@ -5,14 +5,16 @@ using System.Linq.Expressions;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge;
using ShardingCore.Core.Internal.StreamMerge.GenericMerges;
using ShardingCore.Core.Internal.StreamMerge.ListMerge;
using ShardingCore.Core.Internal.StreamMerge.ListSourceMerges;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualTables;
using ShardingCore.DbContexts;
using ShardingCore.Extensions;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
@ -29,23 +31,25 @@ namespace ShardingCore.Core
/// 分表查询构造器
/// </summary>
/// <typeparam name="T"></typeparam>
public class ShardingQueryable<T> : IShardingQueryable<T>
public class ShardingQueryable<T> : IShardingQueryable<T>
{
private IQueryable<T> _source;
private bool _autoParseRoute = true;
private readonly IShardingScopeFactory _shardingScopeFactory;
private readonly IVirtualTableManager _virtualTableManager;
private readonly IShardingParallelDbContextFactory _shardingParallelDbContextFactory;
public Dictionary<Type, Expression> _routes = new Dictionary<Type, Expression>();
private readonly IStreamMergeContextFactory _streamMergeContextFactory;
private Dictionary<Type, Expression> _routes = new Dictionary<Type, Expression>();
private readonly Dictionary<IVirtualTable, List<string>> _endRoutes = new Dictionary<IVirtualTable, List<string>>();
private readonly IRoutingRuleEngineFactory _routingRuleEngineFactory;
private readonly RouteRuleContext<T> _routeRuleContext;
public ShardingQueryable(IQueryable<T> source)
{
_source = source;
_shardingScopeFactory = ShardingContainer.Services.GetService<IShardingScopeFactory>();
_virtualTableManager = ShardingContainer.Services.GetService<IVirtualTableManager>();
_shardingParallelDbContextFactory = ShardingContainer.Services.GetService<IShardingParallelDbContextFactory>();
_streamMergeContextFactory = ShardingContainer.Services.GetService<IStreamMergeContextFactory>();
_routingRuleEngineFactory=ShardingContainer.Services.GetService<IRoutingRuleEngineFactory>();
_routeRuleContext=new RouteRuleContext<T>(source,_virtualTableManager);
}
@ -97,28 +101,40 @@ namespace ShardingCore.Core
return this;
}
private StreamMergeContext<T> GetContext()
{
var routeRuleEngine = _routingRuleEngineFactory.CreateEngine();
var routeResults = routeRuleEngine.Route(_routeRuleContext);
return _streamMergeContextFactory.Create(_source, routeResults);
}
private async Task<List<TResult>> GetGenericMergeEngine<TResult>(Func<IQueryable, Task<TResult>> efQuery)
{
return await new GenericMergeEngine<T>(GetContext()).Execute(efQuery);
}
public async Task<int> CountAsync()
{
var shardingCounts = await GetShardingQueryAsync(async queryable =>await EntityFrameworkQueryableExtensions.CountAsync((IQueryable<T>) queryable));
var shardingCounts = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.CountAsync((IQueryable<T>) queryable));
return shardingCounts.Sum();
}
public async Task<long> LongCountAsync()
{
var shardingCounts = await GetShardingQueryAsync(async queryable => await EntityFrameworkQueryableExtensions.LongCountAsync((IQueryable<T>) queryable));
var shardingCounts = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.LongCountAsync((IQueryable<T>) queryable));
return shardingCounts.Sum();
}
public async Task<List<T>> ToListAsync()
{
return await GetShardingListQueryAsync();
return await new StreamMergeListSourceEngine<T>(GetContext()).Execute();
}
public async Task<T> FirstOrDefaultAsync()
{
var result = await GetShardingQueryAsync(async queryable => await EntityFrameworkQueryableExtensions.FirstOrDefaultAsync((IQueryable<T>) queryable));
var result = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.FirstOrDefaultAsync((IQueryable<T>) queryable));
var q = result.Where(o => o != null).AsQueryable();
var extraEntry = _source.GetExtraEntry();
@ -130,19 +146,19 @@ namespace ShardingCore.Core
public async Task<bool> AnyAsync()
{
return (await GetShardingQueryAsync(x => EntityFrameworkQueryableExtensions.AnyAsync((IQueryable<T>) x)))
return (await GetGenericMergeEngine(x => EntityFrameworkQueryableExtensions.AnyAsync((IQueryable<T>) x)))
.Any(o => o);
}
public async Task<T> MaxAsync()
{
var results = await GetShardingQueryAsync(async queryable => await EntityFrameworkQueryableExtensions.MaxAsync((IQueryable<T>) queryable));
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.MaxAsync((IQueryable<T>) queryable));
return results.Max();
}
public async Task<T> MinAsync()
{
var results = await GetShardingQueryAsync(async queryable => await EntityFrameworkQueryableExtensions.MinAsync((IQueryable<T>) queryable));
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.MinAsync((IQueryable<T>) queryable));
return results.Min();
}
@ -150,7 +166,7 @@ namespace ShardingCore.Core
{
if (typeof(T) != typeof(int))
throw new InvalidOperationException($"{typeof(T)} cast to int failed");
var results = await GetShardingQueryAsync(async queryable => await EntityFrameworkQueryableExtensions.SumAsync((IQueryable<int>) queryable));
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.SumAsync((IQueryable<int>) queryable));
return results.Sum();
}
@ -158,7 +174,7 @@ namespace ShardingCore.Core
{
if (typeof(T) != typeof(long))
throw new InvalidOperationException($"{typeof(T)} cast to long failed");
var results = await GetShardingQueryAsync(async queryable => await EntityFrameworkQueryableExtensions.SumAsync((IQueryable<long>) queryable));
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.SumAsync((IQueryable<long>) queryable));
return results.Sum();
}
@ -166,7 +182,7 @@ namespace ShardingCore.Core
{
if (typeof(T) != typeof(decimal))
throw new InvalidOperationException($"{typeof(T)} cast to decimal failed");
var results = await GetShardingQueryAsync(async queryable => await EntityFrameworkQueryableExtensions.SumAsync((IQueryable<decimal>) queryable));
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.SumAsync((IQueryable<decimal>) queryable));
return results.Sum();
}
@ -174,7 +190,7 @@ namespace ShardingCore.Core
{
if (typeof(T) != typeof(double))
throw new InvalidOperationException($"{typeof(T)} cast to double failed");
var results = await GetShardingQueryAsync(async queryable => await EntityFrameworkQueryableExtensions.SumAsync((IQueryable<double>) queryable));
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.SumAsync((IQueryable<double>) queryable));
return results.Sum();
}
@ -182,264 +198,10 @@ namespace ShardingCore.Core
{
if (typeof(T) != typeof(float))
throw new InvalidOperationException($"{typeof(T)} cast to double failed");
var results = await GetShardingQueryAsync(async queryable => await EntityFrameworkQueryableExtensions.SumAsync((IQueryable<float>) queryable));
var results = await GetGenericMergeEngine(async queryable => await EntityFrameworkQueryableExtensions.SumAsync((IQueryable<float>) queryable));
return results.Sum();
}
private void BeginShardingQuery(ShardingScope scope)
{
var context = ShardingContext.Create();
foreach (var route in _endRoutes)
{
context.TryAddShardingTable(route.Key, route.Value);
}
scope.ShardingAccessor.ShardingContext = context;
}
private void GetQueryableRoutes()
{
//先添加手动路由到当前上下文,之后将不再手动路由里面的自动路由添加到当前上下文
foreach (var kv in _routes)
{
var virtualTable = _virtualTableManager.GetVirtualTable(kv.Key);
if (!_endRoutes.ContainsKey(virtualTable))
{
var physicTables = virtualTable.RouteTo(new RouteConfig(null, null, null, kv.Value));
_endRoutes.Add(virtualTable, physicTables.Select(o => o.Tail).ToList());
}
}
if (_autoParseRoute)
{
var shardingEntities = _source.ParseQueryableRoute();
var autoRoutes = shardingEntities.Where(o => !_routes.ContainsKey(o)).ToList();
foreach (var shardingEntity in autoRoutes)
{
var virtualTable = _virtualTableManager.GetVirtualTable(shardingEntity);
if (!_endRoutes.ContainsKey(virtualTable))
{
//路由获取本次操作物理表
var physicTables = virtualTable.RouteTo(new RouteConfig(_source));
_endRoutes.Add(virtualTable, physicTables.Select(o => o.Tail).ToList());
}
}
}
}
private async Task<List<TResult>> GetShardingQueryAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery)
{
GetQueryableRoutes();
//本次查询仅一张表是对应多个数据源的情况
if (_endRoutes.Values.Count(o => o.Count > 1) == 1)
{
return await GetShardingMultiQueryAsync(efQuery);
}
else
{
#if DEBUG
if (_endRoutes.Values.Count(o => o.Count > 1) > 1)
{
Console.WriteLine("存在性能可能有问题");
}
#endif
var result= await GetShardingSingleQueryAsync(efQuery);
return result;
}
}
private async Task<List<TResult>> GetShardingSingleQueryAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery)
{
using (var scope = _shardingScopeFactory.CreateScope())
{
BeginShardingQuery(scope);
return new List<TResult>() {await efQuery(_source)};
}
}
private async Task<List<TResult>> GetShardingMultiQueryAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery)
{
var extraEntry = _source.GetExtraEntry();
//去除分页,获取前Take+Skip数量
int? take = extraEntry.Take;
int skip = extraEntry.Skip.GetValueOrDefault();
var noPageSource = _source.RemoveTake().RemoveSkip();
if (take.HasValue)
noPageSource = noPageSource.Take(take.Value + skip);
//从各个分表获取数据
var multiRouteEntry = _endRoutes.FirstOrDefault(o => o.Value.Count() > 1);
var tasks = multiRouteEntry.Value.Select(tail =>
{
return Task.Run(async () =>
{
using (var shardingDbContext = _shardingParallelDbContextFactory.Create(string.Empty))
{
var newQ = (IQueryable<T>) noPageSource.ReplaceDbContextQueryable(shardingDbContext);
var shardingQueryable = new ShardingQueryable<T>(newQ);
shardingQueryable.DisableAutoRouteParse();
shardingQueryable.AddManualRoute(multiRouteEntry.Key, tail);
foreach (var singleRouteEntry in _endRoutes.Where(o => o.Key != multiRouteEntry.Key))
{
shardingQueryable.AddManualRoute(singleRouteEntry.Key, singleRouteEntry.Value[0]);
}
return await shardingQueryable.GetShardingQueryAsync(efQuery);
}
});
}).ToArray();
var all = (await Task.WhenAll(tasks)).SelectMany(o => o).ToList();
//合并数据
var resList = all;
if (extraEntry.Orders.Any())
resList = resList.AsQueryable().OrderWithExpression(extraEntry.Orders).ToList();
if (skip > 0)
resList = resList.Skip(skip).ToList();
if (take.HasValue)
resList = resList.Take(take.Value).ToList();
return resList;
}
#region list
private async Task<List<T>> GetShardingListQueryAsync()
{
GetQueryableRoutes();
//本次查询仅一张表是对应多个数据源的情况
if (_endRoutes.Values.Count(o => o.Count > 1) == 1)
{
return await GetShardingMultiListQueryAsync();
}
else
{
#if DEBUG
if (_endRoutes.Values.Count(o => o.Count > 1) > 1)
{
Console.WriteLine("存在性能可能有问题");
}
#endif
return await GetShardingSingleListQueryAsync();
}
}
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator()
{
using (var scope = _shardingScopeFactory.CreateScope())
{
BeginShardingQuery(scope);
#if !EFCORE2
var enumator = _source.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
#endif
#if EFCORE2
var enumator = _source.AsAsyncEnumerable().GetEnumerator();
await enumator.MoveNext();
#endif
return enumator;
}
}
private async Task<List<T>> GetShardingSingleListQueryAsync()
{
using (var scope = _shardingScopeFactory.CreateScope())
{
BeginShardingQuery(scope);
return await _source.ToListAsync();
}
}
private async Task<List<T>> GetShardingMultiListQueryAsync()
{
var extraEntry = _source.GetExtraEntry();
//去除分页,获取前Take+Skip数量
var noPageSource = _source.RemoveTake().RemoveSkip();
if (extraEntry.Take.HasValue)
noPageSource = noPageSource.Take(extraEntry.Take.Value + extraEntry.Skip.GetValueOrDefault());
//从各个分表获取数据
var multiRouteEntry = _endRoutes.FirstOrDefault(o => o.Value.Count() > 1);
List<DbContext> parallelDbContexts = new List<DbContext>(multiRouteEntry.Value.Count);
var enumatorTasks= multiRouteEntry.Value.Select(tail =>
{
return Task.Run(async () =>
{
var shardingDbContext = _shardingParallelDbContextFactory.Create(string.Empty);
parallelDbContexts.Add(shardingDbContext);
var newQ = (IQueryable<T>) noPageSource.ReplaceDbContextQueryable(shardingDbContext);
var shardingQueryable = new ShardingQueryable<T>(newQ);
shardingQueryable.DisableAutoRouteParse();
shardingQueryable.AddManualRoute(multiRouteEntry.Key, tail);
foreach (var singleRouteEntry in _endRoutes.Where(o => o.Key != multiRouteEntry.Key))
{
shardingQueryable.AddManualRoute(singleRouteEntry.Key, singleRouteEntry.Value[0]);
}
#if !EFCORE2
return await shardingQueryable.GetAsyncEnumerator();
#endif
#if EFCORE2
return await shardingQueryable.GetAsyncEnumerator();
#endif
});
}).ToArray();
var enumators = (await Task.WhenAll(enumatorTasks)).ToList();
var engine=new StreamMergeListEngine<T>(new StreamMergeContext(extraEntry.Skip,extraEntry.Take,extraEntry.Orders), enumators);
var result= await engine.Execute();
parallelDbContexts.ForEach(o=>o.Dispose());
return result;
}
#endregion
// private async Task<TResult> GetShardingMultiQueryAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery)
// {
// //去除分页,获取前Take+Skip数量
// int? take = _source.GetTakeCount();
// int skip = _source.GetSkipCount();
// var (sortColumn, sortType) = _source.GetOrderBy();
//
// var noPageSource = _source.RemoveTake().RemoveSkip();
// if (take.HasValue)
// noPageSource = noPageSource.Take(take.Value + skip);
//
// //从各个分表获取数据
// var multiRouteEntry = _endRoutes.FirstOrDefault(o => o.Value.Count() > 1);
// var tasks = multiRouteEntry.Value.Select(tail =>
// {
// return Task.Run(async () =>
// {
// using (var shardingDbContext = _shardingParallelDbContextFactory.Create(string.Empty))
// {
// var newQ = (IQueryable<T>) noPageSource.ReplaceDbContextQueryable(shardingDbContext);
// var shardingQueryable = new ShardingQueryable<T>(newQ);
// shardingQueryable.AddManualRoute(multiRouteEntry.Key, tail);
// foreach (var singleRouteEntry in _endRoutes.Where(o => o.Key != multiRouteEntry.Key))
// {
// shardingQueryable.AddManualRoute(singleRouteEntry.Key, singleRouteEntry.Value[0]);
// }
//
// return await shardingQueryable.GetShardingQueryAsync(efQuery);
// }
// });
// }).ToArray();
// var all=(await Task.WhenAll(tasks)).ToList().SelectMany(o=>o).ToList();
//
// //合并数据
// var resList = all;
// if (!sortColumn.IsNullOrEmpty() && !sortType.IsNullOrEmpty())
// resList = resList.AsQueryable().OrderBy($"{sortColumn} {sortType}").ToList();
// if (skip > 0)
// resList = resList.Skip(skip).ToList();
// if (take.HasValue)
// resList = resList.Take(take.Value).ToList();
//
// return resList;
// }
}
}

View File

@ -25,7 +25,14 @@ namespace ShardingCore.Core.VirtualTables
/// </summary>
/// <param name="shardingEntityType"></param>
/// <returns></returns>
IVirtualTable GetVirtualTable(Type shardingEntityType);
IVirtualTable GetVirtualTable(Type shardingEntityType);
/// <summary>
/// 获取虚拟表 get virtual table by sharding entity type
/// </summary>
/// <param name="shardingEntityType"></param>
/// <returns></returns>
IVirtualTable<T> GetVirtualTable<T>() where T:class,IShardingEntity;
/// <summary>
/// 获取虚拟表 get virtual table by original table name
/// </summary>

View File

@ -55,6 +55,12 @@ namespace ShardingCore.Core.VirtualTables
return virtualTable;
}
public IVirtualTable<T> GetVirtualTable<T>() where T : class, IShardingEntity
{
var shardingEntityType = typeof(T);
return (IVirtualTable<T>)GetVirtualTable(shardingEntityType);
}
public IVirtualTable GetVirtualTable(string originalTableName)
{
return _virtualTables.Values.FirstOrDefault(o => o.GetOriginalTableName() == originalTableName) ?? throw new CreateSqlVirtualTableNotFoundException(originalTableName);

View File

@ -0,0 +1,25 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core.Internal.RoutingRuleEngines;
using ShardingCore.Core.Internal.StreamMerge;
namespace ShardingCore
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 13:32:18
* @Email: 326308290@qq.com
*/
public static class DIExtension
{
public static IServiceCollection AddShardingCore(this IServiceCollection services)
{
services.AddScoped<IStreamMergeContextFactory, StreamMergeContextFactory>();
services.AddScoped<IRouteRuleEngine, QueryRouteRuleEngines>();
services.AddScoped<IRoutingRuleEngineFactory, RoutingRuleEngineFactory>();
return services;
}
}
}

View File

@ -12,6 +12,30 @@ namespace ShardingCore.Extensions
*/
public static class LinqExtension
{
#if !EFCORE5
public static HashSet<TSource> ToHashSet<TSource>(
this IEnumerable<TSource> source,
IEqualityComparer<TSource> comparer = null)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
return new HashSet<TSource>(source, comparer);
}
#endif
/// <summary>
/// 求集合的笛卡尔积
/// </summary>
public static IEnumerable<IEnumerable<T>> Cartesian<T>(this IEnumerable<IEnumerable<T>> sequences)
{
IEnumerable<IEnumerable<T>> tempProduct = new[] {Enumerable.Empty<T>()};
return sequences.Aggregate(tempProduct,
(accumulator, sequence) =>
from accseq in accumulator
from item in sequence
select accseq.Concat(new[] {item})
);
}
public static bool IsEmpty<T>(this IEnumerable<T> source)
{
return source == null || !source.Any();

View File

@ -92,7 +92,7 @@ namespace ShardingCore
}
//添加物理表
virtualTable.AddPhysicTable(new DefaultPhysicTable(virtualTable.GetOriginalTableName(), virtualTable.ShardingConfig.TailPrefix, tail, virtualTable.EntityType));
virtualTable.AddPhysicTable(new DefaultPhysicTable(virtualTable.GetOriginalTableName(), virtualTable, tail));
}
}
}