针对非迭代器进行了连接模式的支持,可以保证动态低内存或者低连接数的系统自动或用户手动控制

This commit is contained in:
xuejiaming 2021-12-04 22:36:08 +08:00
parent 584697fc55
commit 055416d3d9
30 changed files with 550 additions and 199 deletions

View File

@ -45,18 +45,26 @@ namespace ShardingCore6x
{
var services = new ServiceCollection();
services.AddDbContext<DefaultDbContext>(o => o.UseMySql("server=127.0.0.1;port=3306;database=db1;userid=root;password=L6yBtV6qNENrwBy7;", new MySqlServerVersion(new Version()))
//UseSqlServer("Data Source=localhost;Initial Catalog=db1;Integrated Security=True;")
services.AddDbContext<DefaultDbContext>(o => o
//.UseMySql("server=127.0.0.1;port=3306;database=db1;userid=root;password=L6yBtV6qNENrwBy7;", new MySqlServerVersion(new Version()))
.UseSqlServer("Data Source=localhost;Initial Catalog=db1;Integrated Security=True;")
, ServiceLifetime.Transient, ServiceLifetime.Transient);
services.AddLogging();
services.AddShardingDbContext<DefaultShardingDbContext>((conStr, builder) => builder.UseMySql(conStr, new MySqlServerVersion(new Version())), ServiceLifetime.Transient, ServiceLifetime.Transient)
services.AddShardingDbContext<DefaultShardingDbContext>((conStr, builder) => builder
//.UseMySql(conStr, new MySqlServerVersion(new Version()))
.UseSqlServer(conStr)
, ServiceLifetime.Transient, ServiceLifetime.Transient)
.Begin(o =>
{
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
o.AutoTrackEntity = false;
}).AddShardingTransaction((connection, builder) => builder.UseMySql(connection, new MySqlServerVersion(new Version())))
.AddDefaultDataSource("ds0", "server=127.0.0.1;port=3306;database=db2;userid=root;password=L6yBtV6qNENrwBy7;")//"Data Source=localhost;Initial Catalog=db2;Integrated Security=True;")
}).AddShardingTransaction((connection, builder) => builder
//.UseMySql(connection, new MySqlServerVersion(new Version()))
.UseSqlServer(connection)
)
//.AddDefaultDataSource("ds0", "server=127.0.0.1;port=3306;database=db2;userid=root;password=L6yBtV6qNENrwBy7;")
.AddDefaultDataSource("ds0", "Data Source=localhost;Initial Catalog=db2;Integrated Security=True;")
.AddShardingTableRoute(op =>
{
op.AddShardingTableRoute<OrderVirtualTableRoute>();
@ -152,45 +160,45 @@ namespace ShardingCore6x
// var connectionString = _actualConnectionStringManager.GetConnectionString("ds0", false);
// }
//}
[Benchmark]
public async Task CreateQueryable()
{
for (int i = 0; i < N; i++)
{
var next = new Random().Next(1000000, 3000000).ToString();
var queryable = _defaultShardingDbContext.Set<Order>().Where(o => o.Id == next);
}
}
[Benchmark]
public async Task DataSourceRouteRuleEngineFactory()
{
for (int i = 0; i < N; i++)
{
var next = new Random().Next(1000000, 3000000).ToString();
var queryable = _defaultShardingDbContext.Set<Order>().Where(o => o.Id == next);
_dataSourceRouteRuleEngineFactory.Route(queryable);
}
}
[Benchmark]
public async Task TableRouteRuleEngineFactory()
{
for (int i = 0; i < N; i++)
{
var next = new Random().Next(1000000, 3000000).ToString();
var queryable = _defaultShardingDbContext.Set<Order>().Where(o => o.Id == next);
_tableRouteRuleEngineFactory.Route(queryable);
}
}
[Benchmark]
public async Task ShardingCreateStreamMergeContext()
{
for (int i = 0; i < N; i++)
{
var next = new Random().Next(1000000, 3000000).ToString();
var queryable = _defaultShardingDbContext.Set<Order>().Where(o => o.Id == next);
var firstOrDefaultAsync = _streamMergeContextFactory.Create(queryable, _defaultShardingDbContext);
}
}
//[Benchmark]
//public async Task CreateQueryable()
//{
// for (int i = 0; i < N; i++)
// {
// var next = new Random().Next(1000000, 3000000).ToString();
// var queryable = _defaultShardingDbContext.Set<Order>().Where(o => o.Id == next);
// }
//}
//[Benchmark]
//public async Task DataSourceRouteRuleEngineFactory()
//{
// for (int i = 0; i < N; i++)
// {
// var next = new Random().Next(1000000, 3000000).ToString();
// var queryable = _defaultShardingDbContext.Set<Order>().Where(o => o.Id == next);
// _dataSourceRouteRuleEngineFactory.Route(queryable);
// }
//}
//[Benchmark]
//public async Task TableRouteRuleEngineFactory()
//{
// for (int i = 0; i < N; i++)
// {
// var next = new Random().Next(1000000, 3000000).ToString();
// var queryable = _defaultShardingDbContext.Set<Order>().Where(o => o.Id == next);
// _tableRouteRuleEngineFactory.Route(queryable);
// }
//}
//[Benchmark]
//public async Task ShardingCreateStreamMergeContext()
//{
// for (int i = 0; i < N; i++)
// {
// var next = new Random().Next(1000000, 3000000).ToString();
// var queryable = _defaultShardingDbContext.Set<Order>().Where(o => o.Id == next);
// var firstOrDefaultAsync = _streamMergeContextFactory.Create(queryable, _defaultShardingDbContext);
// }
//}
//[Benchmark]
//public async Task NoRouteParseCache()
//{
@ -213,24 +221,24 @@ namespace ShardingCore6x
// _virtualTable.RouteTo(new ShardingTableRouteConfig(queryable: queryable1));
// }
//}
//[Benchmark]
//public async Task NoShardingFirstOrDefaultAsync()
//{
// for (int i = 0; i < N; i++)
// {
// var next = new Random().Next(1, 7000000).ToString();
// var firstOrDefaultAsync = await _defaultDbContext.Set<Order>().FirstOrDefaultAsync(o => o.Id == next);
// }
//}
//[Benchmark]
//public async Task ShardingFirstOrDefaultAsync()
//{
// for (int i = 0; i < N; i++)
// {
// var next = new Random().Next(1, 7000000).ToString();
// var firstOrDefaultAsync = await _defaultShardingDbContext.Set<Order>().FirstOrDefaultAsync(o => o.Id == next);
// }
//}
[Benchmark]
public async Task NoShardingFirstOrDefaultAsync()
{
for (int i = 0; i < N; i++)
{
var next = new Random().Next(1, 7000000).ToString();
var firstOrDefaultAsync = await _defaultDbContext.Set<Order>().FirstOrDefaultAsync(o => o.Id == next);
}
}
[Benchmark]
public async Task ShardingFirstOrDefaultAsync()
{
for (int i = 0; i < N; i++)
{
var next = new Random().Next(1, 7000000).ToString();
var firstOrDefaultAsync = await _defaultShardingDbContext.Set<Order>().FirstOrDefaultAsync(o => o.Id == next);
}
}
//[Benchmark]
//public async Task NoShardingIndexFirstOrDefaultAsync()
//{

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Core
{
/// <summary>
/// 链接模式,可以由用户自行指定,使用内存限制,和连接数限制或者系统自行选择最优
/// 内存限制的意思是最小化内存使用率,就是非一次性获取所有数据然后采用流式聚合
/// 连接限制的意思是最小化连接数使用率,就是单次查询并发连接数为设置的连接数。因为有限制,所以无法一直挂起连接,必须全部获取到内存后进行内存聚合
/// 系统自行选择会根据用户的配置采取最小化连接数但是如果遇到分页则会根据分页策略采取内存限制因为skip过大会导致内存爆炸
/// </summary>
public enum ConnectionModeEnum
{
//内存限制使用流式聚合
MEMORY_LIMIT,
//链接限制使用内存聚合
CONNECTION_LIMIT,
//系统自动选择内存还是流式聚合
SYSTEM_AUTO
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions
{
/// <summary>
/// 数据源池
/// </summary>
public interface IPhysicDataSourcePool
{
/// <summary>
/// 添加一个物理数据源
/// </summary>
/// <param name="physicDataSource"></param>
/// <returns></returns>
bool TryAdd(IPhysicDataSource physicDataSource);
/// <summary>
/// 尝试获取一个物理数据源没有返回null
/// </summary>
/// <param name="dataSourceName"></param>
/// <returns></returns>
IPhysicDataSource TryGet(string dataSourceName);
/// <summary>
/// 获取所有的数据源名称
/// </summary>
/// <returns></returns>
List<string> GetAllDataSourceNames();
}
}

View File

@ -19,6 +19,9 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
public interface IVirtualDataSource
{
/// <summary>
/// 默认的数据源名称
/// </summary>
string DefaultDataSourceName { get; }
/// <summary>
/// 路由到具体的物理数据源
@ -34,37 +37,60 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
/// <returns></returns>
IVirtualDataSourceRoute GetRoute(Type entityType);
ISet<IPhysicDataSource> GetAllPhysicDataSources();
/// <summary>
/// 获取默认的数据源信息
/// </summary>
/// <returns></returns>
IPhysicDataSource GetDefaultDataSource();
/// <summary>
/// 获取数据源
/// </summary>
/// <param name="dataSourceName"></param>
/// <exception cref="ShardingCoreInvalidOperationException">
/// <exception cref="ShardingCoreNotFoundException">
/// thrown if data source name is not in virtual data source
/// the length of the buffer
/// </exception>
/// <returns></returns>
IPhysicDataSource GetPhysicDataSource(string dataSourceName);
/// <summary>
/// 获取数据库链接字符串
/// 获取所有的数据源名称
/// </summary>
/// <returns></returns>
List<string> GetAllDataSourceNames();
/// <summary>
/// 获取连接字符串
/// </summary>
/// <param name="dataSourceName"></param>
/// <returns></returns>
/// <exception cref="ShardingCoreNotFoundException"></exception>
string GetConnectionString(string dataSourceName);
/// <summary>
/// 添加物理表 add physic data source
/// 添加数据源
/// </summary>
/// <param name="physicDataSource"></param>
/// <returns>是否添加成功</returns>
/// <returns></returns>
/// <exception cref="ShardingCoreInvalidOperationException">重复添加默认数据源</exception>
bool AddPhysicDataSource(IPhysicDataSource physicDataSource);
/// <summary>
/// 添加分库路由
/// </summary>
/// <param name="virtualDataSourceRoute"></param>
/// <returns></returns>
/// <exception cref="ShardingCoreInvalidOperationException">对象未配置分库</exception>
bool AddVirtualDataSourceRoute(IVirtualDataSourceRoute virtualDataSourceRoute);
/// <summary>
/// 是否默认数据源
/// </summary>
/// <param name="dataSourceName"></param>
/// <returns></returns>
bool IsDefault(string dataSourceName);
/// <summary>
/// 初始化检查数据源
/// 检查是否配置默认数据源和默认链接字符串
/// </summary>
/// <exception cref="ShardingCoreInvalidOperationException"></exception>
void CheckVirtualDataSource();
}
/// <summary>

View File

@ -0,0 +1,34 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
{
public sealed class PhysicDataSourcePool:IPhysicDataSourcePool
{
private readonly ConcurrentDictionary<string, IPhysicDataSource> _physicDataSources =
new ConcurrentDictionary<string, IPhysicDataSource>();
public bool TryAdd(IPhysicDataSource physicDataSource)
{
return _physicDataSources.TryAdd(physicDataSource.DataSourceName, physicDataSource);
}
public IPhysicDataSource TryGet(string dataSourceName)
{
if (dataSourceName == null) return null;
if (_physicDataSources.TryGetValue(dataSourceName, out var physicDataSource))
return physicDataSource;
return null;
}
public List<string> GetAllDataSourceNames()
{
return _physicDataSources.Keys.ToList();
}
}
}

View File

@ -19,6 +19,9 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources
/// 数据源链接
/// </summary>
string ConnectionString { get; }
/// <summary>
/// 是否是默认的数据源
/// </summary>
bool IsDefault { get; }
}
}

View File

@ -6,6 +6,7 @@ using System.Linq.Expressions;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.ShardingEnumerableQueries;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes;
@ -28,8 +29,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
private readonly IEntityMetadataManager<TShardingDbContext> _entityMetadataManager;
private readonly ConcurrentDictionary<Type, IVirtualDataSourceRoute> _dataSourceVirtualRoutes = new ConcurrentDictionary<Type, IVirtualDataSourceRoute>();
private readonly ConcurrentDictionary<string, IPhysicDataSource> _physicDataSources =
new ConcurrentDictionary<string, IPhysicDataSource>();
private readonly IPhysicDataSourcePool _physicDataSourcePool;
public string DefaultDataSourceName { get; private set; }
public string DefaultConnectionString { get; private set; }
@ -37,6 +37,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
public VirtualDataSource(IEntityMetadataManager<TShardingDbContext> entityMetadataManager)
{
_entityMetadataManager = entityMetadataManager;
_physicDataSourcePool = new PhysicDataSourcePool();
}
@ -82,33 +83,56 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
$"entity type :[{entityType.FullName}] not found virtual data source route");
return dataSourceVirtualRoute;
}
public ISet<IPhysicDataSource> GetAllPhysicDataSources()
{
return _physicDataSources.Select(o=>o.Value).ToHashSet();
}
/// <summary>
/// 获取默认的数据源信息
/// </summary>
/// <returns></returns>
public IPhysicDataSource GetDefaultDataSource()
{
return GetPhysicDataSource(DefaultDataSourceName);
}
/// <summary>
/// 获取物理数据源
/// </summary>
/// <param name="dataSourceName"></param>
/// <returns></returns>
/// <exception cref="ShardingCoreNotFoundException"></exception>
public IPhysicDataSource GetPhysicDataSource(string dataSourceName)
{
Check.NotNull(dataSourceName, "data source name is null,plz confirm IShardingBootstrapper.Star()");
if (!_physicDataSources.TryGetValue(dataSourceName, out var physicDataSource))
throw new ShardingCoreInvalidOperationException($"not found data source that name is :[{dataSourceName}]");
var dataSource = _physicDataSourcePool.TryGet(dataSourceName);
if (null== dataSource)
throw new ShardingCoreNotFoundException($"data source:[{dataSourceName}]");
return physicDataSource;
return dataSource;
}
/// <summary>
/// 获取所有的数据源名称
/// </summary>
/// <returns></returns>
public List<string> GetAllDataSourceNames()
{
return _physicDataSourcePool.GetAllDataSourceNames();
}
/// <summary>
/// 获取数据源
/// </summary>
/// <param name="dataSourceName"></param>
/// <returns></returns>
/// <exception cref="ShardingCoreNotFoundException"></exception>
public string GetConnectionString(string dataSourceName)
{
if (IsDefault(dataSourceName))
return DefaultConnectionString;
return GetPhysicDataSource(dataSourceName).ConnectionString;
}
/// <summary>
/// 添加数据源
/// </summary>
/// <param name="physicDataSource"></param>
/// <returns></returns>
/// <exception cref="ShardingCoreInvalidOperationException">重复添加默认数据源</exception>
public bool AddPhysicDataSource(IPhysicDataSource physicDataSource)
{
if (physicDataSource.IsDefault)
@ -120,9 +144,15 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
DefaultDataSourceName = physicDataSource.DataSourceName;
DefaultConnectionString = physicDataSource.ConnectionString;
}
return _physicDataSources.TryAdd(physicDataSource.DataSourceName, physicDataSource);
}
return _physicDataSourcePool.TryAdd(physicDataSource);
}
/// <summary>
/// 添加分库路由
/// </summary>
/// <param name="virtualDataSourceRoute"></param>
/// <returns></returns>
/// <exception cref="ShardingCoreInvalidOperationException">对象未配置分库</exception>
public bool AddVirtualDataSourceRoute(IVirtualDataSourceRoute virtualDataSourceRoute)
{
if (!virtualDataSourceRoute.EntityMetadata.IsShardingDataSource())
@ -130,16 +160,21 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
return _dataSourceVirtualRoutes.TryAdd(virtualDataSourceRoute.EntityMetadata.EntityType, virtualDataSourceRoute);
}
/// <summary>
/// 是否是默认数据源
/// </summary>
/// <param name="dataSourceName"></param>
/// <returns></returns>
public bool IsDefault(string dataSourceName)
{
return DefaultDataSourceName == dataSourceName;
}
/// <summary>
/// 检查是否配置默认数据源和默认链接字符串
/// </summary>
/// <exception cref="ShardingCoreInvalidOperationException"></exception>
public void CheckVirtualDataSource()
{
if (string.IsNullOrWhiteSpace(DefaultDataSourceName))
throw new ShardingCoreInvalidOperationException(
$"virtual data source not inited {nameof(DefaultDataSourceName)} in IShardingDbContext null");

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.DIExtensions
@ -47,6 +48,9 @@ namespace ShardingCore.DIExtensions
ShardingConfigOption.ParallelQueryTimeOut = shardingCoreBeginOptions.ParallelQueryTimeOut;
ShardingConfigOption.CreateShardingTableOnStart = shardingCoreBeginOptions.CreateShardingTableOnStart;
ShardingConfigOption.IgnoreCreateTableError = shardingCoreBeginOptions.IgnoreCreateTableError;
ShardingConfigOption.MaxQueryConnectionsLimit = shardingCoreBeginOptions.MaxQueryConnectionsLimit;
ShardingConfigOption.ConnectionMode = shardingCoreBeginOptions.ConnectionMode;
ShardingConfigOption.UseMemoryLimitWhileSkip = shardingCoreBeginOptions.UseMemoryLimitWhileSkip;
foreach (var entityType in shardingCoreBeginOptions.GetCreateTableEntities())
{
ShardingConfigOption.AddEntityTryCreateTable(entityType);
@ -105,6 +109,9 @@ namespace ShardingCore.DIExtensions
/// 忽略建表时的错误
/// </summary>
public bool? IgnoreCreateTableError { get; set; } = true;
public int MaxQueryConnectionsLimit { get; set; } = Environment.ProcessorCount;
public ConnectionModeEnum ConnectionMode { get; set; } = ConnectionModeEnum.SYSTEM_AUTO;
public int UseMemoryLimitWhileSkip { get; set; } = 10000;
private readonly ISet<Type> _createTableEntities = new HashSet<Type>();

View File

@ -36,7 +36,7 @@ namespace ShardingCore.DIExtensions
/// <summary>
/// 替换比较表达式
/// 比较表达式用于将数据库的数据获取到内存后进行排序由于数据库排序和内存排序针对某种类型可能不一致导致结果和预期不符如guid和unique identifier
/// 默认已经将此类型的比较器已经修复如果有后续其他数据库类型和c#类型排序不一致的请自行实现 <see cref="IShardingComparer<TShardingDbContext>"/>
/// 默认已经将此类型的比较器已经修复如果有后续其他数据库类型和c#类型排序不一致的请自行实现 <see cref="IShardingComparer"/>
/// </summary>
/// <param name="newShardingComparerFactory"></param>
/// <returns></returns>

View File

@ -1,22 +1,23 @@
using System;
using System.Diagnostics.CodeAnalysis;
namespace ShardingCore.Exceptions;
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 04 December 2021 11:26:17
* @Email: 326308290@qq.com
*/
[ExcludeFromCodeCoverage]
public class ShardingCoreConfigException:ShardingCoreException
namespace ShardingCore.Exceptions
{
public ShardingCoreConfigException(string message) : base(message)
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 04 December 2021 11:26:17
* @Email: 326308290@qq.com
*/
[ExcludeFromCodeCoverage]
public class ShardingCoreConfigException : ShardingCoreException
{
}
public ShardingCoreConfigException(string message) : base(message)
{
}
public ShardingCoreConfigException(string message, Exception innerException) : base(message, innerException)
{
public ShardingCoreConfigException(string message, Exception innerException) : base(message, innerException)
{
}
}
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Exceptions
{
[ExcludeFromCodeCoverage]
public class ShardingCoreNotFoundException:ShardingCoreException
{
public ShardingCoreNotFoundException(string message) : base(message)
{
}
public ShardingCoreNotFoundException(string message, Exception innerException) : base(message, innerException)
{
}
}
}

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Text;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Sharding.Abstractions;
@ -70,9 +71,26 @@ namespace ShardingCore
/// 并发查询超时时间
/// </summary>
public TimeSpan ParallelQueryTimeOut { get; set; }
/// <summary>
/// 默认数据源名称
/// </summary>
public string DefaultDataSourceName { get; set; }
/// <summary>
/// 默认数据库链接字符串
/// </summary>
public string DefaultConnectionString { get; set; }
/// <summary>
/// 最大查询连接数限制
/// </summary>
public int MaxQueryConnectionsLimit { get; set; }
/// <summary>
/// 连接数限制
/// </summary>
public ConnectionModeEnum ConnectionMode { get; set; }
/// <summary>
/// 当ConnectionMode == SYSTEM_AUTO时生效
/// </summary>
public int UseMemoryLimitWhileSkip { get; set; }
}
public interface IShardingConfigOption<TShardingDbContext>: IShardingConfigOption where TShardingDbContext : DbContext, IShardingDbContext

View File

@ -3,10 +3,12 @@ using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControl;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core;
namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
@ -17,27 +19,9 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal abstract class AbstractBaseMergeEngine<TEntity>: IAsyncParallelLimit
internal abstract class AbstractBaseMergeEngine<TEntity>
{
private readonly SemaphoreSlim _semaphore;
private readonly Expression _executeExpression;
private readonly TimeSpan _parallelQueryTimeOut;
public AbstractBaseMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext)
{
_executeExpression = methodCallExpression;
var shardingConfigOption = ShardingContainer.GetServices<IShardingConfigOption>()
.FirstOrDefault(o => o.ShardingDbContextType == shardingDbContext.GetType())??throw new ArgumentNullException(nameof(IShardingConfigOption));
_semaphore = new SemaphoreSlim(Math.Max(1, shardingConfigOption.ParallelQueryMaxThreadCount));
_parallelQueryTimeOut = shardingConfigOption.ParallelQueryTimeOut;
}
public AbstractBaseMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
{
_executeExpression = streamMergeContext.GetOriginalQueryable().Expression;
_semaphore = new SemaphoreSlim(Math.Max(1, streamMergeContext.GetParallelQueryMaxThreadCount()));
_parallelQueryTimeOut = streamMergeContext.GetParallelQueryTimeOut();
}
/// <summary>
/// 异步多线程控制并发
@ -46,33 +30,59 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
/// <param name="executeAsync"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,CancellationToken cancellationToken=new CancellationToken())
//public Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,CancellationToken cancellationToken=new CancellationToken())
//{
// cancellationToken.ThrowIfCancellationRequested();
// var acquired = this._semaphore.Wait((int)parallelTimeOut, cancellationToken);
// if (acquired)
// {
// var once = new SemaphoreReleaseOnlyOnce(this._semaphore);
// try
// {
// return Task.Run(async () =>
// {
// try
// {
// return await executeAsync();
// }
// finally
// {
// once.Release();
// }
// }, cancellationToken);
// }
// catch (Exception)
// {
// once.Release();
// throw;
// }
// }
// else
// {
// throw new ShardingCoreParallelQueryTimeOutException($"execute async time out:[{timeOut.TotalMilliseconds}ms]");
// }
//}
protected ConnectionModeEnum CalcConnectionMode(ConnectionModeEnum currentConnectionMode, int useMemoryLimitWhileSkip, int maxQueryConnectionsLimit, int sqlCount,int? skip)
{
cancellationToken.ThrowIfCancellationRequested();
var parallelTimeOut = _parallelQueryTimeOut.TotalMilliseconds;
var acquired = await this._semaphore.WaitAsync((int)parallelTimeOut, cancellationToken);
if (acquired)
switch (currentConnectionMode)
{
var once = new SemaphoreReleaseOnlyOnce(this._semaphore);
try
case ConnectionModeEnum.MEMORY_LIMIT:
case ConnectionModeEnum.CONNECTION_LIMIT: return currentConnectionMode;
default:
{
return await executeAsync();
}
finally
{
once.Release();
if (skip.GetValueOrDefault() > useMemoryLimitWhileSkip)
{
return ConnectionModeEnum.MEMORY_LIMIT;
}
return maxQueryConnectionsLimit < sqlCount
? ConnectionModeEnum.CONNECTION_LIMIT
: ConnectionModeEnum.MEMORY_LIMIT; ;
}
}
else
{
throw new ShardingCoreParallelQueryTimeOutException(_executeExpression.ShardingPrint());
}
}
public void Dispose()
{
_semaphore?.Dispose();
}
}
}

View File

@ -15,7 +15,16 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
*/
internal interface IAsyncParallelLimit:IDisposable
{
/// <summary>
/// 并发执行方法
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="executeAsync"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,
CancellationToken cancellationToken = new CancellationToken());
bool AsyncParallelContinue<TResult>(List<TResult> results);
}
}

View File

@ -5,11 +5,13 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
@ -28,7 +30,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
private readonly IQueryable<TEntity> _queryable;
private readonly Expression _secondExpression;
public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext):base(methodCallExpression,shardingDbContext)
public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext)
{
_methodCallExpression = methodCallExpression;
if (methodCallExpression.Arguments.Count < 1 || methodCallExpression.Arguments.Count > 2)
@ -83,46 +85,88 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
public async Task<List<RouteQueryResult<TResult>>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken())
{
var dataSourceRouteResult = _mergeContext.DataSourceRouteResult;
var maxQueryConnectionsLimit = _mergeContext.GetMaxQueryConnectionsLimit();
var enumeratorTasks = dataSourceRouteResult.IntersectDataSources.SelectMany(dataSourceName =>
var waitExecuteQueue = dataSourceRouteResult.IntersectDataSources.SelectMany(dataSourceName =>
{
return _mergeContext.TableRouteResults.Select(routeResult =>
return _mergeContext.TableRouteResults.Select(routeResult =>new SqlRouteUnit(dataSourceName,routeResult));
}).GroupBy(o=>o.DataSourceName).Select(sqlGroups =>
{
var sqlCount = sqlGroups.Count();
//根据用户配置单次查询期望并发数
int exceptCount =
Math.Max(
0 == sqlCount % maxQueryConnectionsLimit
? sqlCount / maxQueryConnectionsLimit
: sqlCount / maxQueryConnectionsLimit + 1, 1);
//计算应该使用那种链接模式
ConnectionModeEnum connectionMode = CalcConnectionMode(_mergeContext.GetConnectionMode(),
_mergeContext.GetUseMemoryLimitWhileSkip(), maxQueryConnectionsLimit, sqlCount, 0);
var sqlExecutorUnitPartitions = sqlGroups
.Select((o, i) => new { Obj = o, index = i % exceptCount }).GroupBy(o => o.index)
.Select(o => o.Select(g => new SqlExecutorUnit(connectionMode,g.Obj)).ToList()).ToList();
return sqlExecutorUnitPartitions.Select(o => new SqlExecutorGroup<SqlExecutorUnit>(o)).ToList();
})
.Select(executorGroups =>
{
return Task.Run(async () =>
return Task.Run(async() =>
{
//return new RouteQueryResult<TResult>(dataSourceName, routeResult, default);
var asyncExecuteQueryable = CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult);
LinkedList<RouteQueryResult<TResult>> result = new LinkedList<RouteQueryResult<TResult>>();
foreach (var executorGroup in executorGroups)
{
var executorGroupParallelExecuteTasks = executorGroup.Groups.Select(executor =>
{
return Task.Run(async () =>
{
var dataSourceName = executor.RouteUnit.DataSourceName;
var routeResult = executor.RouteUnit.TableRouteResult;
return await AsyncParallelResultExecute(asyncExecuteQueryable, dataSourceName, routeResult, efQuery,
cancellationToken);
});
var asyncExecuteQueryable =
CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult);
});
}).ToArray();
return (await Task.WhenAll(enumeratorTasks)).ToList();
var queryResult = await efQuery(asyncExecuteQueryable);
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
//return await AsyncParallelResultExecute(asyncExecuteQueryable, dataSourceName,
// routeResult, efQuery,
// cancellationToken);
},cancellationToken);
}).ToArray();
var routeQueryResults = (await Task.WhenAll(executorGroupParallelExecuteTasks)).ToList();
foreach (var routeQueryResult in routeQueryResults)
{
result.AddLast(routeQueryResult);
}
}
return result;
},cancellationToken);
}).ToArray();
return (await Task.WhenAll(waitExecuteQueue)).SelectMany(o=>o).ToList();
}
/// <summary>
/// 异步并发查询
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="queryable"></param>
/// <param name="dataSourceName"></param>
/// <param name="routeResult"></param>
/// <param name="efQuery"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<RouteQueryResult<TResult>> AsyncParallelResultExecute<TResult>(IQueryable queryable,string dataSourceName,TableRouteResult routeResult, Func<IQueryable, Task<TResult>> efQuery,
CancellationToken cancellationToken = new CancellationToken())
{
return AsyncParallelLimitExecuteAsync(async () =>
{
var queryResult = await efQuery(queryable);
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
},cancellationToken);
}
///// <summary>
///// 异步并发查询
///// </summary>
///// <typeparam name="TResult"></typeparam>
///// <param name="queryable"></param>
///// <param name="dataSourceName"></param>
///// <param name="routeResult"></param>
///// <param name="efQuery"></param>
///// <param name="cancellationToken"></param>
///// <returns></returns>
//public async Task<RouteQueryResult<TResult>> AsyncParallelResultExecute<TResult>(IQueryable queryable,string dataSourceName,TableRouteResult routeResult, Func<IQueryable, Task<TResult>> efQuery,
// CancellationToken cancellationToken = new CancellationToken())
//{
// cancellationToken.ThrowIfCancellationRequested();
// var queryResult = await efQuery(queryable);
// return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
//}
public virtual IQueryable DoCombineQueryable<TResult>(IQueryable<TEntity> queryable)
{

View File

@ -27,7 +27,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
public StreamMergeContext<TEntity> StreamMergeContext { get; }
public AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext):base(streamMergeContext)
public AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
{
StreamMergeContext = streamMergeContext;
}
@ -100,22 +100,20 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
/// <param name="async"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<StreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumerator(IQueryable<TEntity> queryable, bool async,
public async Task<StreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumerator(IQueryable<TEntity> queryable, bool async,
CancellationToken cancellationToken = new CancellationToken())
{
return AsyncParallelLimitExecuteAsync(async () =>
cancellationToken.ThrowIfCancellationRequested();
if (async)
{
if (async)
{
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
else
{
var enumerator = GetEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
}
}, cancellationToken);
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
else
{
var enumerator = GetEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
}
}
/// <summary>

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.MergeEngines.Common
{
public sealed class SqlExecutorGroup<T>
{
public SqlExecutorGroup(List<T> groups)
{
Groups = groups;
}
/// <summary>
/// 执行组
/// </summary>
public List<T> Groups { get; }
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core;
namespace ShardingCore.Sharding.MergeEngines.Common
{
public class SqlExecutorUnit
{
public SqlExecutorUnit(ConnectionModeEnum connectionMode, SqlRouteUnit routeUnit)
{
ConnectionMode = connectionMode;
RouteUnit = routeUnit;
}
public SqlRouteUnit RouteUnit { get; }
public ConnectionModeEnum ConnectionMode { get; }
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
namespace ShardingCore.Sharding.MergeEngines.Common
{
public sealed class SqlRouteUnit
{
public SqlRouteUnit(string dataSourceName, TableRouteResult tableRouteResult)
{
DataSourceName = dataSourceName;
TableRouteResult = tableRouteResult;
}
public string DataSourceName { get; }
public TableRouteResult TableRouteResult { get; }
}
}

View File

@ -38,7 +38,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
{
var newQueryable = CreateAsyncExecuteQueryable(dataSourceName, routeResult);
return await AsyncParallelEnumerator(newQueryable, async, cancellationToken);
});
},cancellationToken);
});
}).ToArray();

View File

@ -10,6 +10,7 @@ using ShardingCore.Sharding.Abstractions;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
@ -194,6 +195,18 @@ namespace ShardingCore.Sharding
{
return _shardingConfigOption.ParallelQueryTimeOut;
}
public int GetMaxQueryConnectionsLimit()
{
return _shardingConfigOption.MaxQueryConnectionsLimit;
}
public ConnectionModeEnum GetConnectionMode()
{
return _shardingConfigOption.ConnectionMode;
}
public int GetUseMemoryLimitWhileSkip()
{
return _shardingConfigOption.UseMemoryLimitWhileSkip;
}
/// <summary>
/// 是否是跨资源查询
/// </summary>

View File

@ -7,6 +7,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
@ -240,5 +241,8 @@ namespace ShardingCore
public string DefaultDataSourceName { get; set; }
public string DefaultConnectionString { get; set; }
public int MaxQueryConnectionsLimit { get; set; } = Environment.ProcessorCount;
public ConnectionModeEnum ConnectionMode { get; set; } = ConnectionModeEnum.SYSTEM_AUTO;
public int UseMemoryLimitWhileSkip { get; set; } = 10000;
}
}

View File

@ -146,7 +146,7 @@ namespace ShardingCore.Test
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreInvalidOperationException), e.GetType());
Assert.Equal(typeof(ShardingCoreNotFoundException), e.GetType());
}
var queryable = new List<string>().Select(o => new SequenceClass { Id = "123", T = o }).AsQueryable();

View File

@ -127,7 +127,7 @@ namespace ShardingCore.Test
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreInvalidOperationException), e.GetType());
Assert.Equal(typeof(ShardingCoreNotFoundException), e.GetType());
}
var queryable = new List<string>().Select(o => new SequenceClass { Id = "123", T = o }).AsQueryable();

View File

@ -136,7 +136,7 @@ namespace ShardingCore.Test2x
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreInvalidOperationException), e.GetType());
Assert.Equal(typeof(ShardingCoreNotFoundException), e.GetType());
}
var queryable = new List<string>().Select(o => new SequenceClass { Id = "123", T = o }).AsQueryable();

View File

@ -126,7 +126,7 @@ namespace ShardingCore.Test2x
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreInvalidOperationException), e.GetType());
Assert.Equal(typeof(ShardingCoreNotFoundException), e.GetType());
}
var queryable = new List<string>().Select(o => new SequenceClass { Id = "123", T = o }).AsQueryable();

View File

@ -136,7 +136,7 @@ namespace ShardingCore.Test3x
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreInvalidOperationException), e.GetType());
Assert.Equal(typeof(ShardingCoreNotFoundException), e.GetType());
}
var queryable = new List<string>().Select(o => new SequenceClass { Id = "123", T = o }).AsQueryable();

View File

@ -127,7 +127,7 @@ namespace ShardingCore.Test3x
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreInvalidOperationException), e.GetType());
Assert.Equal(typeof(ShardingCoreNotFoundException), e.GetType());
}
var queryable = new List<string>().Select(o => new SequenceClass { Id = "123", T = o }).AsQueryable();

View File

@ -136,7 +136,7 @@ namespace ShardingCore.Test5x
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreInvalidOperationException), e.GetType());
Assert.Equal(typeof(ShardingCoreNotFoundException), e.GetType());
}
var queryable = new List<string>().Select(o => new SequenceClass { Id = "123", T = o }).AsQueryable();

View File

@ -127,7 +127,7 @@ namespace ShardingCore.Test5x
}
catch (Exception e)
{
Assert.Equal(typeof(ShardingCoreInvalidOperationException), e.GetType());
Assert.Equal(typeof(ShardingCoreNotFoundException), e.GetType());
}
var queryable = new List<string>().Select(o => new SequenceClass { Id = "123", T = o }).AsQueryable();