先暂存一下分库支持还有一些错误之后处理

This commit is contained in:
xuejiaming 2021-09-18 17:36:28 +08:00
parent 1c1d6cf965
commit a947a18b90
60 changed files with 1419 additions and 880 deletions

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace ShardingCore.Core
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/17 13:21:14
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IShardingDataSource
{
}
}

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace ShardingCore.Core
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/17 13:21:04
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IShardingTable
{
}
}

View File

@ -18,7 +18,7 @@ namespace ShardingCore.Core.PhysicTables
public DefaultPhysicTable(IVirtualTable virtualTable, string tail)
{
VirtualTable = virtualTable;
OriginalName = virtualTable.GetOriginalTableName();
OriginalName = virtualTable.GetVirtualTableName();
Tail = tail;
EntityType = VirtualTable.EntityType;
}

View File

@ -11,7 +11,6 @@ namespace ShardingCore.Core.PhysicTables
*/
public interface IPhysicTable
{
string DSName { get;}
/// <summary>
/// 表全称
/// </summary>

View File

@ -18,21 +18,29 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
/// </summary>
public interface IVirtualDataSource
{
Type EntityType{get;}
Type ShardingDbContextType{get; }
string DefaultDataSourceName { get; }
/// <summary>
/// 路由到具体的物理数据源
/// </summary>
/// <returns></returns>
List<IPhysicDataSource> RouteTo(ShardingDataSourceRouteConfig routeRouteConfig);
/// <returns>data source names</returns>
List<string> RouteTo(Type entityType,ShardingDataSourceRouteConfig routeRouteConfig);
/// <summary>
/// 获取当前数据源的路由
/// </summary>
/// <returns></returns>
IVirtualDataSourceRoute GetRoute();
IVirtualDataSourceRoute GetRoute(Type entityType);
ISet<IPhysicDataSource> GetAllPhysicDataSources();
IPhysicDataSource GetDefaultDataSource();
/// <summary>
/// 获取数据源
/// </summary>
/// <param name="dataSourceName"></param>
/// <returns></returns>
IPhysicDataSource GetPhysicDataSource(string dataSourceName);
/// <summary>
/// 添加物理表 add physic data source
@ -41,21 +49,5 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
/// <returns>是否添加成功</returns>
bool AddPhysicDataSource(IPhysicDataSource physicDataSource);
/// <summary>
/// add virtual table
/// </summary>
/// <param name="dsname"></param>
/// <param name="virtualTable"></param>
/// <returns></returns>
bool AddVirtualTable(string dsname, IVirtualTable virtualTable);
/// <summary>
/// 获取所有的虚拟表
/// </summary>
/// <returns></returns>
ISet<IVirtualTable> GetVirtualTables();
}
public interface IVirtualDataSource<T> : IVirtualDataSource where T : class
{
new IVirtualDataSourceRoute<T> GetRoute();
}
}

View File

@ -1,71 +1,35 @@
using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Core.VirtualDataSources
namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 06 February 2021 14:24:01
* @Email: 326308290@qq.com
*/
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 06 February 2021 14:24:01
* @Email: 326308290@qq.com
*/
public interface IVirtualDataSourceManager
{
IPhysicDataSource GetDataSource(string dsName);
/// <summary>
/// 添加链接
/// </summary>
/// <param name="physicDataSource"></param>
void AddPhysicDataSource(IPhysicDataSource physicDataSource);
/// <summary>
/// 获取默认的数据源
/// </summary>
/// <param name="shardingDbContextType"></param>
/// <returns></returns>
IPhysicDataSource GetDefaultDataSource(Type shardingDbContextType);
IVirtualDataSource GetVirtualDataSource(Type shardingDbContextType, Type entityType);
/// <summary>
/// 获取虚拟表 get virtual table by sharding entity type
/// </summary>
/// <returns></returns>
IVirtualDataSource<T> GetVirtualDataSource<T>() where T : class, IShardingDataSource;
/// <summary>
/// 获取
/// </summary>
/// <param name="shardingDbContextType"></param>
/// <param name="entityType"></param>
/// <returns></returns>
List<IPhysicDataSource> GetDefaultDataSources(Type shardingDbContextType,Type entityType);
bool AddPhysicDataSource(IPhysicDataSource physicDataSource);
IVirtualDataSource GetVirtualDataSource();
IPhysicDataSource GetDefaultDataSource();
string GetDefaultDataSourceName();
IPhysicDataSource GetPhysicDataSource(string dataSourceName);
}
public interface IVirtualDataSourceManager<TShardingDbContext> : IVirtualDataSourceManager where TShardingDbContext : DbContext, IShardingDbContext
{
/// <summary>
/// 添加虚拟数据源应用启动时 add virtual table when app start
/// </summary>
/// <param name="virtualDataSource"></param>
void AddVirtualDataSource(IVirtualDataSource virtualDataSource);
/// <summary>
/// 添加链接对应的对象
/// </summary>
/// <param name="connectKey"></param>
/// <param name="entityType"></param>
void AddConnectEntities(string connectKey,Type entityType);
/// <summary>
/// 获取虚拟表 get virtual table by sharding entity type
/// </summary>
/// <param name="shardingEntityType"></param>
/// <returns></returns>
IVirtualDataSource GetVirtualDataSource(Type shardingEntityType);
List<string> GetEntityTypeLinkedConnectKeys(Type shardingEntityType);
}
}

View File

@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Text;
using ShardingCore.Core.VirtualDataSources;
namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources
{
@ -14,24 +13,23 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources
*/
public class DefaultPhysicDataSource:IPhysicDataSource
{
public DefaultPhysicDataSource(string dsName, string connectionString, IVirtualDataSource virtualDataSource)
public DefaultPhysicDataSource(string dsName, string connectionString, bool isDefault)
{
DSName = dsName;
DataSourceName = dsName;
ConnectionString = connectionString;
VirtualDataSource = virtualDataSource;
EntityType = virtualDataSource.EntityType;
IsDefault = isDefault;
}
public string DSName { get; }
public string DataSourceName { get; }
public string ConnectionString { get; }
public Type EntityType { get; }
public IVirtualDataSource VirtualDataSource { get; }
public bool IsDefault { get; }
protected bool Equals(DefaultPhysicDataSource other)
{
return DSName == other.DSName;
return DataSourceName == other.DataSourceName;
}
public override bool Equals(object obj)
@ -44,7 +42,8 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources
public override int GetHashCode()
{
return (DSName != null ? DSName.GetHashCode() : 0);
return (DataSourceName != null ? DataSourceName.GetHashCode() : 0);
}
}
}

View File

@ -1,5 +1,4 @@
using System;
using ShardingCore.Core.VirtualDataSources;
namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources
{
@ -15,15 +14,11 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources
/// <summary>
/// data source name
/// </summary>
string DSName { get; }
string DataSourceName { get; }
/// <summary>
/// 数据源链接
/// </summary>
string ConnectionString { get; }
/// <summary>
/// 映射类类型
/// </summary>
Type EntityType { get; }
IVirtualDataSource VirtualDataSource { get; }
bool IsDefault { get; }
}
}

View File

@ -12,89 +12,90 @@ using ShardingCore.Utils;
namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
{
/*
* @Author: xjm
* @Description:
* @Date: Friday, 05 February 2021 15:21:04
* @Email: 326308290@qq.com
*/
public class VirtualDataSource<T>:IVirtualDataSource<T> where T:class
/*
* @Author: xjm
* @Description:
* @Date: Friday, 05 February 2021 15:21:04
* @Email: 326308290@qq.com
*/
public class VirtualDataSource : IVirtualDataSource
{
private readonly IVirtualDataSourceRoute<T> _dataSourceVirtualRoute;
public ShardingEntityConfig ShardingEntityType { get; }
private readonly ConcurrentDictionary<Type, IVirtualDataSourceRoute> _dataSourceVirtualRoutes = new ConcurrentDictionary<Type, IVirtualDataSourceRoute>();
public Type EntityType { get; }
private readonly ConcurrentDictionary<IPhysicDataSource, object> _physicDataSources = new ConcurrentDictionary<IPhysicDataSource, object>();
private readonly ConcurrentDictionary<string, IPhysicDataSource> _physicDataSources =
new ConcurrentDictionary<string, IPhysicDataSource>();
public string DefaultDataSourceName { get; }
public Type ShardingDbContextType { get; }
private readonly ConcurrentDictionary<string, IVirtualTable> _virtualTables =
new ConcurrentDictionary<string, IVirtualTable>();
public VirtualDataSource(IVirtualDataSourceRoute<T> virtualRoute)
public VirtualDataSource(Type shardingDbContextType,string defaultDataSourceName)
{
_dataSourceVirtualRoute = virtualRoute;
EntityType = typeof(T);
ShardingEntityType = ShardingUtil.Parse(EntityType);
ShardingDbContextType = shardingDbContextType;
DefaultDataSourceName = defaultDataSourceName??throw new ArgumentNullException(nameof(defaultDataSourceName));
}
public List<IPhysicDataSource> RouteTo(ShardingDataSourceRouteConfig routeRouteConfig)
public List<string> RouteTo(Type entityType,ShardingDataSourceRouteConfig routeRouteConfig)
{
var shardingEntityConfig = ShardingUtil.Parse(entityType);
var virtualDataSourceRoute = GetRoute( entityType);
if (routeRouteConfig.UseQueryable())
return _dataSourceVirtualRoute.RouteWithWhere(GetAllPhysicDataSources(), routeRouteConfig.GetQueryable());
return virtualDataSourceRoute.RouteWithWhere(routeRouteConfig.GetQueryable());
if (routeRouteConfig.UsePredicate())
return _dataSourceVirtualRoute.RouteWithWhere(GetAllPhysicDataSources(), new EnumerableQuery<T>((Expression<Func<T, bool>>) routeRouteConfig.GetPredicate()));
return virtualDataSourceRoute.RouteWithWhere((IQueryable)Activator.CreateInstance(typeof(EnumerableQuery<>).MakeGenericType(entityType), routeRouteConfig.UsePredicate()));
object shardingKeyValue = null;
if (routeRouteConfig.UseValue())
shardingKeyValue = routeRouteConfig.GetShardingKeyValue();
if (routeRouteConfig.UseEntity())
shardingKeyValue = routeRouteConfig.GetShardingDataSource().GetPropertyValue(ShardingEntityType.ShardingDataSourceField);
shardingKeyValue = routeRouteConfig.GetShardingDataSource().GetPropertyValue(shardingEntityConfig.ShardingDataSourceField);
if (shardingKeyValue != null)
{
var routeWithValue = _dataSourceVirtualRoute.RouteWithValue(GetAllPhysicDataSources(), shardingKeyValue);
return new List<IPhysicDataSource>(1) {routeWithValue};
var dataSourceName = virtualDataSourceRoute.RouteWithValue(shardingKeyValue);
return new List<string>(1) { dataSourceName };
}
throw new NotImplementedException(nameof(ShardingDataSourceRouteConfig));
}
IVirtualDataSourceRoute<T> IVirtualDataSource<T>.GetRoute()
public IVirtualDataSourceRoute GetRoute(Type entityType)
{
return _dataSourceVirtualRoute;
}
if(!entityType.IsShardingDataSource())
throw new InvalidOperationException(
$"entity type :[{entityType.FullName}] not impl [{nameof(IShardingDataSource)}]");
public IVirtualDataSourceRoute GetRoute()
{
return _dataSourceVirtualRoute;
if (!_dataSourceVirtualRoutes.TryGetValue(entityType, out var dataSourceVirtualRoute))
throw new InvalidOperationException(
$"entity type :[{entityType.FullName}] not found virtual data source route");
return dataSourceVirtualRoute;
}
public ISet<IPhysicDataSource> GetAllPhysicDataSources()
{
return _physicDataSources.Keys.ToHashSet();
return _physicDataSources.Values.ToHashSet();
}
public IPhysicDataSource GetDefaultDataSource()
{
return GetPhysicDataSource(DefaultDataSourceName);
}
public IPhysicDataSource GetPhysicDataSource(string dataSourceName)
{
if (!_physicDataSources.TryGetValue(dataSourceName, out var physicDataSource))
throw new InvalidOperationException($"not found data source that name is :[{dataSourceName}]");
return physicDataSource;
}
public bool AddPhysicDataSource(IPhysicDataSource physicDataSource)
{
if (physicDataSource.EntityType != EntityType)
throw new InvalidOperationException($"virtual data source entity type :[{EntityType.FullName}] physic data source entity type:[{physicDataSource.EntityType.FullName}]");
return _physicDataSources.TryAdd(physicDataSource, null);
}
public bool AddVirtualTable(string dsname, IVirtualTable virtualTable)
{
if (virtualTable.EntityType != EntityType)
throw new InvalidOperationException($"virtual data source entity:{EntityType.FullName},virtual table entity:{virtualTable.EntityType.FullName}");
if (_physicDataSources.Keys.All(o => o.DSName != dsname))
throw new InvalidOperationException($"data source name:[{dsname}] not found virtual data source");
return _virtualTables.TryAdd(dsname, virtualTable);
}
public ISet<IVirtualTable> GetVirtualTables()
{
return _virtualTables.Values.ToHashSet();
if (physicDataSource.IsDefault && physicDataSource.DataSourceName != DefaultDataSourceName)
throw new InvalidOperationException($"default data source name:[{DefaultDataSourceName}],add physic default data source name:[{physicDataSource.DataSourceName}]");
return _physicDataSources.TryAdd(physicDataSource.DataSourceName, physicDataSource);
}
}
}

View File

@ -2,12 +2,10 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualRoutes;
namespace ShardingCore.Core.VirtualDataSources
namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
{
/*
* @Author: xjm
@ -18,10 +16,12 @@ namespace ShardingCore.Core.VirtualDataSources
public class VirtualDataSourceManager : IVirtualDataSourceManager
{
private readonly IServiceProvider _serviceProvider;
/// <summary>
/// {sharding db context type :{entity type:virtual data source}}
/// </summary>
private readonly ConcurrentDictionary<Type, IVirtualDataSource> _virtualDataSources = new ConcurrentDictionary<Type, IVirtualDataSource>();
private readonly Dictionary<string, ISet<Type>> _shardingConnectKeys = new Dictionary<string,ISet<Type>>();
private readonly Dictionary<Type, ISet<string>> _entityTypeConnectKeyIndex = new Dictionary<Type, ISet<string>>();
public VirtualDataSourceManager(IServiceProvider serviceProvider)
{
@ -40,67 +40,33 @@ namespace ShardingCore.Core.VirtualDataSources
//}
}
public ISet<string> GetAllShardingConnectKeys()
public bool AddPhysicDataSource(Type shardingDbContextType, IPhysicDataSource physicDataSource)
{
return _shardingConnectKeys.Keys.ToHashSet();
throw new NotImplementedException();
}
public List<IVirtualDataSource> GetAllDataSources()
public IVirtualDataSource GetVirtualDataSource(Type shardingDbContextType)
{
return _virtualDataSources.Select(o => o.Value).ToList();
if (!_virtualDataSources.TryGetValue(shardingDbContextType, out var virtualDataSource))
throw new InvalidOperationException($"not found virtual data source sharding db context type:[{shardingDbContextType.FullName}]");
return virtualDataSource;
}
public void AddConnectEntities(string connectKey, Type entityType)
public IPhysicDataSource GetDefaultDataSource(Type shardingDbContextType)
{
if (!_shardingConnectKeys.ContainsKey(connectKey))
throw new ShardingCoreException("connectKey not init");
_shardingConnectKeys[connectKey].Add(entityType);
BuildIndex(connectKey, entityType);
var virtualDataSource = GetVirtualDataSource(shardingDbContextType);
return virtualDataSource.GetDefaultDataSource();
}
private void BuildIndex(string connectKey, Type entityType)
public string GetDefaultDataSourceName(Type shardingDbContextType)
{
if (_entityTypeConnectKeyIndex.ContainsKey(entityType))
{
_entityTypeConnectKeyIndex[entityType].Add(connectKey);
}
else
{
_entityTypeConnectKeyIndex.Add(entityType,new HashSet<string>(){ connectKey
});
}
var virtualDataSource = GetVirtualDataSource(shardingDbContextType);
return virtualDataSource.DefaultDataSourceName;
}
public IVirtualDataSource GetVirtualDataSource(Type shardingEntityType)
public IPhysicDataSource GetPhysicDataSource(Type shardingDbContextType, string dataSourceName)
{
if (!_virtualDataSources.TryGetValue(shardingEntityType, out var virtualTable) || virtualTable == null)
throw new VirtualDataSourceNotFoundException($"{shardingEntityType}");
return virtualTable;
}
public IVirtualDataSource<T> GetVirtualDataSource<T>() where T : class, IShardingDataSource
{
return (IVirtualDataSource<T>)GetVirtualDataSource(typeof(T));
}
public List<string> GetEntityTypeLinkedConnectKeys(Type shardingEntityType)
{
if (!_entityTypeConnectKeyIndex.ContainsKey(shardingEntityType))
throw new ShardingCoreException($"entity:[{shardingEntityType}] not found");
return _entityTypeConnectKeyIndex[shardingEntityType].ToList();
}
public void AddShardingConnectKey(string connectKey)
{
if (!_shardingConnectKeys.ContainsKey(connectKey))
_shardingConnectKeys.Add(connectKey,new HashSet<Type>());
}
public void AddVirtualDataSource(IVirtualDataSource virtualDataSource)
{
_virtualDataSources.TryAdd(virtualDataSource.EntityType, virtualDataSource);
throw new NotImplementedException();
}
}
}

View File

@ -23,11 +23,10 @@ namespace ShardingCore.Core.VirtualTables
/// 同数据库虚拟表
/// </summary>
/// <typeparam name="T"></typeparam>
public class DefaultVirtualTable<T> : IVirtualTable<T> where T : class
public class DefaultVirtualTable<T> : IVirtualTable<T> where T : class,IShardingTable
{
private readonly IVirtualTableRoute<T> _virtualTableRoute;
public string DSName { get; }
/// <summary>
/// 分表的对象类型
/// </summary>
@ -45,17 +44,14 @@ namespace ShardingCore.Core.VirtualTables
/// </summary>
public bool EnablePagination => PaginationMetadata != null;
public IVirtualDataSource VirtualDataSource { get; }
private readonly ConcurrentDictionary<IPhysicTable, object> _physicTables = new ConcurrentDictionary<IPhysicTable, object>();
public DefaultVirtualTable(string dsname,IVirtualDataSource<T> virtualDataSource,IVirtualTableRoute<T> virtualTableRoute)
public DefaultVirtualTable(IVirtualTableRoute<T> virtualTableRoute)
{
DSName = dsname;
_virtualTableRoute = virtualTableRoute;
EntityType = typeof(T);
ShardingConfig = ShardingKeyUtil.Parse(EntityType);
VirtualDataSource = virtualDataSource;
var paginationConfiguration = virtualTableRoute.CreatePaginationConfiguration();
if (paginationConfiguration != null)
{
@ -103,7 +99,7 @@ namespace ShardingCore.Core.VirtualTables
ShardingConfig.ShardingOriginalTable = originalTableName;
}
public string GetOriginalTableName()
public string GetVirtualTableName()
{
return ShardingConfig.ShardingOriginalTable;
}

View File

@ -4,11 +4,12 @@ using System.Collections.Generic;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.PhysicTables;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Core.VirtualTables
namespace ShardingCore.Core.VirtualDatabase.VirtualTables
{
/*
* @Author: xjm
@ -19,170 +20,219 @@ namespace ShardingCore.Core.VirtualTables
/// <summary>
/// 同一个数据库下的虚拟表管理者
/// </summary>
public class DefaultVirtualTableManager : IVirtualTableManager
public class DefaultVirtualTableManager<TShardingDbContext> : IVirtualTableManager<TShardingDbContext> where TShardingDbContext : DbContext, IShardingDbContext
{
private readonly IServiceProvider _serviceProvider;
//{sharidngDbContextType:{entityType,virtualTableType}}
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<Type, IVirtualTable>> _shardingVirtualTables = new ConcurrentDictionary<Type, ConcurrentDictionary<Type, IVirtualTable>>();
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<string, IVirtualTable>> _shardingOriginalTaleVirtualTales = new ConcurrentDictionary<Type, ConcurrentDictionary<string, IVirtualTable>>();
public DefaultVirtualTableManager(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
//var shardingEntities = AssemblyHelper.CurrentDomain.GetAssemblies().SelectMany(o => o.GetTypes())
// .Where(type => !String.IsNullOrEmpty(type.Namespace))
// .Where(type => !type.IsAbstract&&type.GetInterfaces()
// .Any(it => it.IsInterface &&typeof(IShardingTable)==it)
// );
//foreach (var shardingEntity in shardingEntities)
//{
// Type genericType = typeof(IVirtualTable<>);
// Type interfaceType = genericType.MakeGenericType(shardingEntity);
// var virtualTable = (IVirtualTable)serviceProvider.GetService(interfaceType);
// _virtualTables.TryAdd(virtualTable.EntityType, virtualTable);
//}
}
private void CheckShardingDbContextType(Type shardingDbContextType, string dsname)
{
if (!shardingDbContextType.IsShardingDbContext())
throw new ShardingCoreException(
$"{shardingDbContextType.FullName} must impl {nameof(IShardingDbContext)}");
}
/// <summary>
/// {entityType,virtualTableType}
/// </summary>
private readonly ConcurrentDictionary<Type, IVirtualTable> _shardingVirtualTables = new ConcurrentDictionary<Type, IVirtualTable>();
private readonly ConcurrentDictionary<string, IVirtualTable> _shardingVirtualTaleVirtualTables = new ConcurrentDictionary<string, IVirtualTable>();
private void CheckShardingTableEntityType(string dsname, Type shardingEntityType)
public bool AddVirtualTable(IVirtualTable virtualTable)
{
var result = _shardingVirtualTables.TryAdd(virtualTable.EntityType, virtualTable);
_shardingVirtualTaleVirtualTables.TryAdd(virtualTable.GetVirtualTableName(), virtualTable);
return result;
}
/// <summary>
/// 获取对应的虚拟表
/// </summary>
/// <param name="shardingEntityType"></param>
/// <returns></returns>
public IVirtualTable GetVirtualTable(Type shardingEntityType)
{
if (!shardingEntityType.IsShardingTable())
throw new ShardingCoreException(
$"{shardingEntityType.FullName} must impl {nameof(IShardingTable)}");
}
private string CreateShardingEntityTypeKey(Type shardingDbContextType, string dsname, Type entityType)
{
return $"{shardingDbContextType.FullName}{entityType.FullName}";
}
private string CreateShardingTableNameKey(Type shardingDbContextType, string dsname, string originalTableName)
{
return $"{shardingDbContextType.FullName}{originalTableName}";
}
public void AddVirtualTable(Type shardingDbContextType, string dsname, IVirtualTable virtualTable)
{
CheckShardingDbContextType(shardingDbContextType, dsname);
var innerShardingVirtualTables = _shardingVirtualTables.GetOrAdd(shardingDbContextType,
key => new ConcurrentDictionary<Type, IVirtualTable>());
if (!innerShardingVirtualTables.ContainsKey(virtualTable.EntityType))
{
innerShardingVirtualTables.TryAdd(virtualTable.EntityType, virtualTable);
}
var innerShardingOriginalTableVirtualTables = _shardingOriginalTaleVirtualTales.GetOrAdd(shardingDbContextType,type=>new ConcurrentDictionary<string, IVirtualTable>());
if (!innerShardingOriginalTableVirtualTables.ContainsKey(virtualTable.GetOriginalTableName()))
{
innerShardingOriginalTableVirtualTables.TryAdd(virtualTable.GetOriginalTableName(), virtualTable);
}
}
public IVirtualTable GetVirtualTable(Type shardingDbContextType, string dsname, Type shardingEntityType)
{
CheckShardingDbContextType(shardingDbContextType,dsname);
CheckShardingTableEntityType(dsname,shardingEntityType);
var shardingKey = CreateShardingEntityTypeKey(shardingDbContextType, dsname, shardingEntityType);
if(!_shardingVirtualTables.TryGetValue(shardingDbContextType,out var innerShardingVirtualTables) || innerShardingVirtualTables.IsEmpty())
throw new ShardingVirtualTableNotFoundException(shardingDbContextType.FullName);
if (!innerShardingVirtualTables.TryGetValue(shardingEntityType, out var virtualTable)||virtualTable==null)
throw new InvalidOperationException(shardingEntityType.FullName);
if (!_shardingVirtualTables.TryGetValue(shardingEntityType, out var virtualTable))
throw new ShardingVirtualTableNotFoundException(shardingEntityType.FullName);
return virtualTable;
}
public IVirtualTable<T> GetVirtualTable<TDbContext, T>(string dsname) where T : class, IShardingTable where TDbContext : DbContext, IShardingDbContext
public IVirtualTable TryGetVirtualTable(Type shardingEntityType)
{
return (IVirtualTable<T>)GetVirtualTable(typeof(TDbContext), dsname, typeof(T));
}
public IVirtualTable GetVirtualTable(Type shardingDbContextType, string dsname, string originalTableName)
{
CheckShardingDbContextType(shardingDbContextType, dsname);
if (!_shardingOriginalTaleVirtualTales.TryGetValue(shardingDbContextType, out var innerShardingOriginalTableVirtualTables) || innerShardingOriginalTableVirtualTables.IsEmpty())
throw new ShardingVirtualTableNotFoundException(shardingDbContextType.FullName);
if(!innerShardingOriginalTableVirtualTables.TryGetValue(originalTableName,out var virtualTable)|| virtualTable==null)
throw new ShardingVirtualTableNotFoundException(originalTableName);
return virtualTable;
}
public IVirtualTable GetVirtualTable<TDbContext>(string dsname, string originalTableName) where TDbContext : DbContext, IShardingDbContext
{
return GetVirtualTable(typeof(TDbContext), dsname,originalTableName);
}
public IVirtualTable TryGetVirtualTable(Type shardingDbContextType, string dsname, string originalTableName)
{
CheckShardingDbContextType(shardingDbContextType,dsname);
if (!_shardingOriginalTaleVirtualTales.TryGetValue(shardingDbContextType,
out var innerShardingOriginalTableVirtualTables) || innerShardingOriginalTableVirtualTables.IsEmpty())
return null;
if (!innerShardingOriginalTableVirtualTables.TryGetValue(originalTableName, out var virtualTable) || virtualTable == null)
if (!shardingEntityType.IsShardingTable())
throw new InvalidOperationException(shardingEntityType.FullName);
if (!_shardingVirtualTables.TryGetValue(shardingEntityType, out var virtualTable))
return null;
return virtualTable;
}
public IVirtualTable TryGetVirtualTablee<TDbContext>(string dsname, string originalTableName) where TDbContext : DbContext, IShardingDbContext
public IVirtualTable GetVirtualTable(string virtualTableName)
{
return TryGetVirtualTable(typeof(TDbContext), dsname, originalTableName);
if (!_shardingVirtualTaleVirtualTables.TryGetValue(virtualTableName, out var virtualTable))
throw new ShardingVirtualTableNotFoundException(virtualTableName);
return virtualTable;
}
public IVirtualTable TryGetVirtualTable(string virtualTableName)
{
if (!_shardingVirtualTaleVirtualTables.TryGetValue(virtualTableName, out var virtualTable))
return null;
return virtualTable;
}
public ISet<IVirtualTable> GetAllVirtualTables()
{
return _shardingVirtualTables.Values.ToHashSet();
}
public bool AddPhysicTable(IVirtualTable virtualTable, IPhysicTable physicTable)
{
return AddPhysicTable(virtualTable.EntityType, physicTable);
}
public bool AddPhysicTable(Type shardingEntityType, IPhysicTable physicTable)
{
if (!_shardingVirtualTables.TryGetValue(shardingEntityType, out var virtualTable))
throw new ShardingVirtualTableNotFoundException(shardingEntityType.FullName);
return virtualTable.AddPhysicTable(physicTable);
}
public List<IVirtualTable> GetAllVirtualTables(Type shardingDbContextType, string dsname)
{
if (!_shardingOriginalTaleVirtualTales.TryGetValue(shardingDbContextType,
out var innerShardingOriginalTableVirtualTables) || innerShardingOriginalTableVirtualTables.IsEmpty())
return new List<IVirtualTable>();
var keyPrefix = shardingDbContextType.FullName;
return innerShardingOriginalTableVirtualTables.Values.ToList();
}
public List<IVirtualTable> GetAllVirtualTables<TDbContext>(string dsname) where TDbContext : DbContext, IShardingDbContext
{
return GetAllVirtualTables(typeof(TDbContext),dsname);
}
public void AddPhysicTable(Type shardingDbContextType, string dsname, IVirtualTable virtualTable, IPhysicTable physicTable)
{
AddPhysicTable(shardingDbContextType, dsname, virtualTable.EntityType, physicTable);
}
public void AddPhysicTable<TDbContext>(string dsname, IVirtualTable virtualTable, IPhysicTable physicTable) where TDbContext : DbContext, IShardingDbContext
{
AddPhysicTable(typeof(TDbContext), dsname, virtualTable.EntityType, physicTable);
}
public void AddPhysicTable(Type shardingDbContextType, string dsname, Type shardingEntityType, IPhysicTable physicTable)
{
var virtualTable = GetVirtualTable(shardingDbContextType, dsname,shardingEntityType);
virtualTable.AddPhysicTable(physicTable);
}
public void AddPhysicTable<TDbContext>(string dsname, Type shardingEntityType, IPhysicTable physicTable) where TDbContext : DbContext, IShardingDbContext
{
var virtualTable = GetVirtualTable(typeof(TDbContext), dsname,shardingEntityType);
virtualTable.AddPhysicTable(physicTable);
}
///// <summary>
///// 是否是分表字段
///// {sharidngDbContextType:{entityType,virtualTableType}}
///// </summary>
///// <param name="shardingEntityType"></param>
///// <param name="shardingField"></param>
///// <returns></returns>
//public bool IsShardingKey(Type shardingEntityType, string shardingField)
//private readonly ConcurrentDictionary<Type, ConcurrentDictionary<Type, IVirtualTable>> _shardingVirtualTables = new ConcurrentDictionary<Type, ConcurrentDictionary<Type, IVirtualTable>>();
//private readonly ConcurrentDictionary<Type, ConcurrentDictionary<string, IVirtualTable>> _shardingVirtualTaleVirtualTables = new ConcurrentDictionary<Type, ConcurrentDictionary<string, IVirtualTable>>();
//public DefaultVirtualTableManager()
//{
// return _virtualTables.TryGetValue(shardingEntityType, out var virtualTable) && virtualTable.ShardingConfigOption.ShardingField == shardingField;
//}
//private void CheckShardingDbContextType(Type shardingDbContextType)
//{
// if (!shardingDbContextType.IsShardingDbContext())
// throw new ShardingCoreException(
// $"{shardingDbContextType.FullName} must impl {nameof(IShardingDbContext)}");
//}
//private void CheckShardingTableEntityType(Type shardingEntityType)
//{
// if (!shardingEntityType.IsShardingTable())
// throw new ShardingCoreException(
// $"{shardingEntityType.FullName} must impl {nameof(IShardingTable)}");
//}
//private string CreateShardingEntityTypeKey(Type shardingDbContextType,Type entityType)
//{
// return $"{shardingDbContextType.FullName}{entityType.FullName}";
//}
//private string CreateShardingTableNameKey(Type shardingDbContextType,string originalTableName)
//{
// return $"{shardingDbContextType.FullName}{originalTableName}";
//}
//public void AddVirtualTable(Type shardingDbContextType,IVirtualTable virtualTable)
//{
// CheckShardingDbContextType(shardingDbContextType);
// var innerShardingVirtualTables = _shardingVirtualTables.GetOrAdd(shardingDbContextType,
// key => new ConcurrentDictionary<Type, IVirtualTable>());
// if (!innerShardingVirtualTables.ContainsKey(virtualTable.EntityType))
// {
// innerShardingVirtualTables.TryAdd(virtualTable.EntityType, virtualTable);
// }
// var innerShardingOriginalTableVirtualTables = _shardingVirtualTaleVirtualTables.GetOrAdd(shardingDbContextType,type=>new ConcurrentDictionary<string, IVirtualTable>());
// if (!innerShardingOriginalTableVirtualTables.ContainsKey(virtualTable.GetVirtualTableName()))
// {
// innerShardingOriginalTableVirtualTables.TryAdd(virtualTable.GetVirtualTableName(), virtualTable);
// }
//}
//public IVirtualTable GetVirtualTable(Type shardingDbContextType,Type shardingEntityType)
//{
// CheckShardingDbContextType(shardingDbContextType);
// CheckShardingTableEntityType(shardingEntityType);
// var shardingKey = CreateShardingEntityTypeKey(shardingDbContextType, shardingEntityType);
// if(!_shardingVirtualTables.TryGetValue(shardingDbContextType,out var innerShardingVirtualTables) || innerShardingVirtualTables.IsEmpty())
// throw new ShardingVirtualTableNotFoundException(shardingDbContextType.FullName);
// if (!innerShardingVirtualTables.TryGetValue(shardingEntityType, out var virtualTable)||virtualTable==null)
// throw new ShardingVirtualTableNotFoundException(shardingEntityType.FullName);
// return virtualTable;
//}
//public IVirtualTable<T> GetVirtualTable<TDbContext, T>() where T : class, IShardingTable where TDbContext : DbContext, IShardingDbContext
//{
// return (IVirtualTable<T>)GetVirtualTable(typeof(TDbContext), typeof(T));
//}
//public IVirtualTable GetVirtualTable(Type shardingDbContextType, string originalTableName)
//{
// CheckShardingDbContextType(shardingDbContextType);
// if (!_shardingVirtualTaleVirtualTables.TryGetValue(shardingDbContextType, out var innerShardingOriginalTableVirtualTables) || innerShardingOriginalTableVirtualTables.IsEmpty())
// throw new ShardingVirtualTableNotFoundException(shardingDbContextType.FullName);
// if(!innerShardingOriginalTableVirtualTables.TryGetValue(originalTableName,out var virtualTable)|| virtualTable==null)
// throw new ShardingVirtualTableNotFoundException(originalTableName);
// return virtualTable;
//}
//public IVirtualTable GetVirtualTable<TDbContext>(string originalTableName) where TDbContext : DbContext, IShardingDbContext
//{
// return GetVirtualTable(typeof(TDbContext),originalTableName);
//}
//public IVirtualTable TryGetVirtualTable(Type shardingDbContextType,string originalTableName)
//{
// CheckShardingDbContextType(shardingDbContextType);
// if (!_shardingVirtualTaleVirtualTables.TryGetValue(shardingDbContextType,
// out var innerShardingOriginalTableVirtualTables) || innerShardingOriginalTableVirtualTables.IsEmpty())
// return null;
// if (!innerShardingOriginalTableVirtualTables.TryGetValue(originalTableName, out var virtualTable) || virtualTable == null)
// return null;
// return virtualTable;
//}
//public IVirtualTable TryGetVirtualTablee<TDbContext>(string originalTableName) where TDbContext : DbContext, IShardingDbContext
//{
// return TryGetVirtualTable(typeof(TDbContext), originalTableName);
//}
//public List<IVirtualTable> GetAllVirtualTables(Type shardingDbContextType)
//{
// if (!_shardingVirtualTaleVirtualTables.TryGetValue(shardingDbContextType,
// out var innerShardingOriginalTableVirtualTables) || innerShardingOriginalTableVirtualTables.IsEmpty())
// return new List<IVirtualTable>();
// var keyPrefix = shardingDbContextType.FullName;
// return innerShardingOriginalTableVirtualTables.Values.ToList();
//}
//public List<IVirtualTable> GetAllVirtualTables<TDbContext>() where TDbContext : DbContext, IShardingDbContext
//{
// return GetAllVirtualTables(typeof(TDbContext));
//}
//public void AddPhysicTable(Type shardingDbContextType,IVirtualTable virtualTable, IPhysicTable physicTable)
//{
// AddPhysicTable(shardingDbContextType, virtualTable.EntityType, physicTable);
//}
//public void AddPhysicTable<TDbContext>(IVirtualTable virtualTable, IPhysicTable physicTable) where TDbContext : DbContext, IShardingDbContext
//{
// AddPhysicTable(typeof(TDbContext),virtualTable.EntityType, physicTable);
//}
//public void AddPhysicTable(Type shardingDbContextType,Type shardingEntityType, IPhysicTable physicTable)
//{
// var virtualTable = GetVirtualTable(shardingDbContextType,shardingEntityType);
// virtualTable.AddPhysicTable(physicTable);
//}
//public void AddPhysicTable<TDbContext>(Type shardingEntityType, IPhysicTable physicTable) where TDbContext : DbContext, IShardingDbContext
//{
// var virtualTable = GetVirtualTable(typeof(TDbContext),shardingEntityType);
// virtualTable.AddPhysicTable(physicTable);
//}
}
}

View File

@ -18,10 +18,6 @@ namespace ShardingCore.Core.VirtualTables
/// </summary>
public interface IVirtualTable
{
/// <summary>
/// 数据源名称
/// </summary>
string DSName { get; }
/// <summary>
/// 分表的类型
/// </summary>
@ -38,10 +34,6 @@ namespace ShardingCore.Core.VirtualTables
/// 是否启用分页配置
/// </summary>
bool EnablePagination { get; }
/// <summary>
/// 所属虚拟数据库
/// </summary>
IVirtualDataSource VirtualDataSource { get; }
/// <summary>
/// 获取所有的物理表
@ -73,7 +65,7 @@ namespace ShardingCore.Core.VirtualTables
/// 获取原始表名 get original table name
/// </summary>
/// <returns></returns>
string GetOriginalTableName();
string GetVirtualTableName();
/// <summary>
/// 获取当前虚拟表的路由 get this virtual table route
/// </summary>
@ -87,7 +79,7 @@ namespace ShardingCore.Core.VirtualTables
List<string> GetTaleAllTails();
}
public interface IVirtualTable<T> : IVirtualTable where T : class
public interface IVirtualTable<T> : IVirtualTable where T : class,IShardingTable
{
new IVirtualTableRoute<T> GetVirtualRoute();
}

View File

@ -2,108 +2,80 @@ using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.PhysicTables;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Core.VirtualTables
namespace ShardingCore.Core.VirtualDatabase.VirtualTables
{
/*
* @Author: xjm
* @Description: api
* @Date: Friday, 18 December 2020 14:10:03
* @Email: 326308290@qq.com
*/
/// <summary>
/// 虚拟表管理者 virtual table manager
/// </summary>
/*
* @Author: xjm
* @Description: api
* @Date: Friday, 18 December 2020 14:10:03
* @Email: 326308290@qq.com
*/
public interface IVirtualTableManager
{
/// <summary>
/// 添加虚拟表应用启动时 add virtual table when app start
/// </summary>
/// <param name="shardingDbContextType">分表的dbcontext类型</param>
/// <param name="dsname">分表的dbcontext类型</param>
/// <param name="virtualTable">虚拟表</param>
void AddVirtualTable(Type shardingDbContextType,string dsname,IVirtualTable virtualTable);
bool AddVirtualTable(IVirtualTable virtualTable);
/// <summary>
/// 获取虚拟表 get virtual table by sharding entity type
/// </summary>
/// <param name="shardingDbContextType">分表的dbcontext类型</param>
/// <param name="shardingEntityType"></param>
/// <returns></returns>
IVirtualTable GetVirtualTable(Type shardingDbContextType, string dsname, Type shardingEntityType);
IVirtualTable GetVirtualTable(Type shardingEntityType);
/// <summary>
/// 获取虚拟表 get virtual table by sharding entity type
/// 尝试获取虚拟表
/// </summary>
/// <returns></returns>
IVirtualTable<T> GetVirtualTable<TDbContext,T>(string dsname) where T : class, IShardingTable where TDbContext : DbContext, IShardingDbContext;
/// <summary>
/// 获取虚拟表 get virtual table by original table name
/// </summary>
/// <param name="shardingDbContextType">分表的dbcontext类型</param>
/// <param name="originalTableName"></param>
/// <param name="shardingEntityType"></param>
/// <returns></returns>
IVirtualTable GetVirtualTable(Type shardingDbContextType, string dsname, string originalTableName);
IVirtualTable GetVirtualTable<TDbContext>(string dsname, string originalTableName) where TDbContext : DbContext, IShardingDbContext;
IVirtualTable TryGetVirtualTable(Type shardingEntityType);
/// <summary>
/// 获取虚拟表 get virtual table by actual table name
/// </summary>
/// <param name="virtualTableName"></param>
/// <returns></returns>
IVirtualTable GetVirtualTable(string virtualTableName);
/// <summary>
/// 尝试获取虚拟表没有返回null
/// </summary>
/// <param name="shardingDbContextType">分表的dbcontext类型</param>
/// <param name="originalTableName"></param>
/// <param name="virtualTableName"></param>
/// <returns></returns>
IVirtualTable TryGetVirtualTable(Type shardingDbContextType, string dsname, string originalTableName);
IVirtualTable TryGetVirtualTablee<TDbContext>(string dsname, string originalTableName) where TDbContext : DbContext, IShardingDbContext;
IVirtualTable TryGetVirtualTable(string virtualTableName);
/// <summary>
/// 获取所有的虚拟表 get all virtual table
/// </summary>
/// <param name="shardingDbContextType">分表的dbcontext类型</param>
/// <returns></returns>
List<IVirtualTable> GetAllVirtualTables(Type shardingDbContextType, string dsname);
List<IVirtualTable> GetAllVirtualTables<TDbContext>(string dsname) where TDbContext : DbContext, IShardingDbContext;
ISet<IVirtualTable> GetAllVirtualTables();
/// <summary>
/// 添加物理表 add physic table
/// </summary>
/// <param name="shardingDbContextType">分表的dbcontext类型</param>
/// <param name="virtualTable"></param>
/// <param name="physicTable"></param>
void AddPhysicTable(Type shardingDbContextType,string dsname, IVirtualTable virtualTable, IPhysicTable physicTable);
void AddPhysicTable<TDbContext>(string dsname, IVirtualTable virtualTable, IPhysicTable physicTable) where TDbContext : DbContext, IShardingDbContext;
bool AddPhysicTable(IVirtualTable virtualTable, IPhysicTable physicTable);
/// <summary>
/// 添加物理表 add physic table
/// </summary>
/// <param name="shardingDbContextType">分表的dbcontext类型</param>
/// <param name="shardingEntityType"></param>
/// <param name="physicTable"></param>
void AddPhysicTable(Type shardingDbContextType, string dsname, Type shardingEntityType, IPhysicTable physicTable);
void AddPhysicTable<TDbContext>(string dsname, Type shardingEntityType, IPhysicTable physicTable) where TDbContext : DbContext, IShardingDbContext;
bool AddPhysicTable(Type shardingEntityType, IPhysicTable physicTable);
}
/// <summary>
/// 虚拟表管理者 virtual table manager
/// </summary>
public interface IVirtualTableManager<TShardingDbContext> : IVirtualTableManager where TShardingDbContext : DbContext, IShardingDbContext
{
///// <summary>
///// 添加物理表 add physic table
///// </summary>
///// <param name="virtualTable"></param>
///// <param name="physicTable"></param>
//void AddPhysicTable(IVirtualTable virtualTable, IPhysicTable physicTable);
///// <summary>
///// 添加物理表 add physic table
///// </summary>
///// <param name="shardingEntityType"></param>
///// <param name="physicTable"></param>
//void AddPhysicTable(Type shardingEntityType, IPhysicTable physicTable);
///// <summary>
///// 判断是否是分表字段
///// </summary>
///// <param name="shardingEntityType"></param>
///// <param name="shardingField"></param>
///// <returns></returns>
//bool IsShardingKey(Type shardingEntityType, string shardingField);
}
}

View File

@ -18,19 +18,19 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes
/// <summary>
/// 根据查询条件路由返回物理数据源
/// </summary>
/// <param name="allPhysicDataSources"></param>
/// <param name="queryable"></param>
/// <returns></returns>
List<IPhysicDataSource> RouteWithWhere(ISet<IPhysicDataSource> allPhysicDataSources,IQueryable queryable);
/// <returns>data source name</returns>
List<string> RouteWithWhere(IQueryable queryable);
/// <summary>
/// 根据值进行路由
/// </summary>
/// <param name="allPhysicDataSources"></param>
/// <param name="shardingKeyValue"></param>
/// <returns></returns>
IPhysicDataSource RouteWithValue(ISet<IPhysicDataSource> allPhysicDataSources, object shardingKeyValue);
/// <returns>data source name</returns>
string RouteWithValue(object shardingKeyValue);
ISet<string> GetAllDataSourceNames();
}
public interface IVirtualDataSourceRoute<T> : IVirtualDataSourceRoute where T : class

View File

@ -14,13 +14,13 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine
*/
public class DataSourceRouteResult
{
public DataSourceRouteResult(ISet<IPhysicDataSource> intersectDataSources)
public DataSourceRouteResult(ISet<string> intersectDataSources)
{
IntersectDataSources = intersectDataSources;
}
/// <summary>
/// 交集
/// </summary>
public ISet<IPhysicDataSource> IntersectDataSources { get; }
public ISet<string> IntersectDataSources { get; }
}
}

View File

@ -3,8 +3,8 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualDataSources;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
@ -36,11 +36,11 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine
{
if (!shardingDbContextType.IsShardingDataSource())
throw new InvalidOperationException($"{shardingDbContextType} must impl {nameof(IShardingDbContext)}");
var dataSourceMaps = new Dictionary<Type, ISet<IPhysicDataSource>>();
var dataSourceMaps = new Dictionary<Type, ISet<string>>();
var notShardingDataSourceEntityType = routeRuleContext.QueryEntities.FirstOrDefault(o => !o.IsShardingDataSource());
//存在不分表的
if (notShardingDataSourceEntityType != null)
dataSourceMaps.Add(notShardingDataSourceEntityType, new HashSet<IPhysicDataSource>() { _virtualDataSourceManager.GetDefaultDataSource(shardingDbContextType) });
dataSourceMaps.Add(notShardingDataSourceEntityType, new HashSet<string>() { _virtualDataSourceManager.GetDefaultDataSourceName(shardingDbContextType) });
var queryEntities = routeRuleContext.QueryEntities.Where(o => o.IsShardingDataSource()).ToList();
@ -48,8 +48,8 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine
throw new ShardingCoreNotSupportedException($"{routeRuleContext.Queryable.ShardingPrint()}");
foreach (var queryEntity in queryEntities)
{
var virtualDataSource = _virtualDataSourceManager.GetVirtualDataSource(queryEntity);
var dataSourceConfigs = virtualDataSource.RouteTo(new ShardingDataSourceRouteConfig(routeRuleContext.Queryable));
var virtualDataSource = _virtualDataSourceManager.GetVirtualDataSource(shardingDbContextType);
var dataSourceConfigs = virtualDataSource.RouteTo(queryEntity,new ShardingDataSourceRouteConfig(routeRuleContext.Queryable));
if (!dataSourceMaps.ContainsKey(queryEntity))
{
dataSourceMaps.Add(queryEntity, dataSourceConfigs.ToHashSet());

View File

@ -44,7 +44,7 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes
List<string> GetAllTails();
}
public interface IVirtualTableRoute<T> : IVirtualTableRoute where T : class
public interface IVirtualTableRoute<T> : IVirtualTableRoute where T : class,IShardingTable
{
/// <summary>
/// 返回null就是表示不开启分页配置

View File

@ -14,8 +14,8 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine
*/
public interface ITableRouteRuleEngineFactory
{
TableRouteRuleContext<T> CreateContext<T>(string dsname, IQueryable<T> queryable);
IEnumerable<TableRouteResult> Route<T>(Type shardingDbContextType, string dsname, IQueryable<T> queryable);
TableRouteRuleContext<T> CreateContext<T>(IQueryable<T> queryable);
IEnumerable<TableRouteResult> Route<T>(Type shardingDbContextType,IQueryable<T> queryable);
IEnumerable<TableRouteResult> Route<T>(Type shardingDbContextType, TableRouteRuleContext<T> ruleContext);
}
}

View File

@ -1,4 +1,5 @@
using System.Linq;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualTables;
namespace ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine
@ -13,14 +14,12 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine
{
private readonly IVirtualTableManager _virtualTableManager;
public TableRouteRuleContext(string dsname, IQueryable<T> queryable, IVirtualTableManager virtualTableManager)
public TableRouteRuleContext(IQueryable<T> queryable, IVirtualTableManager virtualTableManager)
{
Dsname = dsname;
Queryable = queryable;
_virtualTableManager = virtualTableManager;
}
public string Dsname { get; }
public IQueryable<T> Queryable { get; }
}

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.PhysicTables;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;

View File

@ -2,6 +2,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Sharding.Abstractions;
@ -33,14 +34,14 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine
/// <param name="dsname"></param>
/// <param name="queryable"></param>
/// <returns></returns>
public TableRouteRuleContext<T> CreateContext<T>(string dsname,IQueryable<T> queryable)
public TableRouteRuleContext<T> CreateContext<T>(IQueryable<T> queryable)
{
return new TableRouteRuleContext<T>(dsname,queryable, _virtualTableManager);
return new TableRouteRuleContext<T>(queryable, _virtualTableManager);
}
public IEnumerable<TableRouteResult> Route<T>(Type shardingDbContextType, string dsname, IQueryable<T> queryable)
public IEnumerable<TableRouteResult> Route<T>(Type shardingDbContextType, IQueryable<T> queryable)
{
var ruleContext = CreateContext<T>(dsname,queryable);
var ruleContext = CreateContext<T>(queryable);
return _tableRouteRuleEngine.Route(shardingDbContextType,ruleContext);
}

View File

@ -17,6 +17,7 @@ using ShardingCore.Core.QueryRouteManagers;
using ShardingCore.Core.QueryRouteManagers.Abstractions;
using ShardingCore.Core.ShardingPage;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding.ReadWriteConfigurations;

View File

@ -7,16 +7,17 @@ using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.DbContexts
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 24 December 2020 08:22:23
* @Email: 326308290@qq.com
*/
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 24 December 2020 08:22:23
* @Email: 326308290@qq.com
*/
public interface IShardingDbContextFactory
{
DbContext Create(Type shardingDbContextType,ShardingDbContextOptions shardingDbContextOptions);
DbContext Create<TShardingDbContext>(ShardingDbContextOptions shardingDbContextOptions) where TShardingDbContext:DbContext,IShardingDbContext;
//DbContext Create(DbConnection dbConnection,string tail);
DbContext Create(ShardingDbContextOptions shardingDbContextOptions);
}
public interface IShardingDbContextFactory<TShardingDbContext>: IShardingDbContextFactory where TShardingDbContext:DbContext,IShardingDbContext
{
}
}

View File

@ -0,0 +1,65 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingTransactions;
namespace ShardingCore.EFCores
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/5 20:37:36
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingRelationalTransaction : RelationalTransaction
{
private readonly IShardingTransaction _shardingDbContext;
public ShardingRelationalTransaction(IRelationalConnection connection, DbTransaction transaction, Guid transactionId, IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger, bool transactionOwned) : base(connection, transaction, transactionId, logger, transactionOwned)
{
_shardingDbContext = (IShardingDbContext)null;
_shardingDbContext.UseShardingTransaction(transaction);
}
protected override void ClearTransaction()
{
base.ClearTransaction();
_shardingDbContext.UseShardingTransaction(null);
}
protected override async Task ClearTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
await base.ClearTransactionAsync(cancellationToken);
_shardingDbContext.UseShardingTransaction(null);
}
public override void Commit()
{
base.Commit();
}
public override Task CommitAsync(CancellationToken cancellationToken = new CancellationToken())
{
return base.CommitAsync(cancellationToken);
}
public override void Rollback()
{
base.Rollback();
}
public override Task RollbackAsync(CancellationToken cancellationToken = new CancellationToken())
{
return base.RollbackAsync(cancellationToken);
}
}
}

View File

@ -5,6 +5,7 @@ using System.Text;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Core.VirtualTables;
using ShardingCore.DbContexts.ShardingDbContexts;

View File

@ -13,6 +13,7 @@ using Microsoft.EntityFrameworkCore.Storage.Internal;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingTransactions;
namespace ShardingCore.EFCores
{
@ -27,11 +28,14 @@ namespace ShardingCore.EFCores
{
private readonly IRelationalConnection _relationalConnection;
private readonly ISupportShardingTransaction _supportShardingTransaction;
public ShardingRelationalConnection(IRelationalConnection _relationalConnection, DbTransaction transaction)
{
this._relationalConnection = _relationalConnection;
((IShardingTransaction)Context).UseShardingTransaction(transaction);
if (Context is ISupportShardingTransaction supportShardingTransaction)
{
_supportShardingTransaction = supportShardingTransaction;
}
}
public void ResetState()
@ -47,46 +51,58 @@ namespace ShardingCore.EFCores
public IDbContextTransaction BeginTransaction()
{
return _relationalConnection.BeginTransaction();
var dbContextTransaction = _relationalConnection.BeginTransaction();
_supportShardingTransaction?.BeginTransaction();
return dbContextTransaction;
}
public Task<IDbContextTransaction> BeginTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
public async Task<IDbContextTransaction> BeginTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _relationalConnection.BeginTransactionAsync(cancellationToken);
var dbContextTransaction = await _relationalConnection.BeginTransactionAsync(cancellationToken);
_supportShardingTransaction?.BeginTransaction();
return dbContextTransaction;
}
public void CommitTransaction()
{
_relationalConnection.CommitTransaction();
_supportShardingTransaction?.Commit();
}
public void RollbackTransaction()
{
_relationalConnection.RollbackTransaction();
_supportShardingTransaction?.Rollback();
}
#if EFCORE5
public IDbContextTransaction UseTransaction(DbTransaction transaction, Guid transactionId)
{
var dbContextTransaction = _relationalConnection.UseTransaction(transaction, transactionId);
((IShardingTransaction)Context).UseShardingTransaction(transaction);
_supportShardingTransaction?.UseTransaction(transaction);
return dbContextTransaction;
}
public async Task<IDbContextTransaction> UseTransactionAsync(DbTransaction transaction, Guid transactionId,
CancellationToken cancellationToken = new CancellationToken())
{
var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, transactionId, cancellationToken);
((IShardingTransaction)Context).UseShardingTransaction(transaction);
_supportShardingTransaction?.UseTransaction(transaction);
return dbContextTransaction;
}
public Task CommitTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
public async Task CommitTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _relationalConnection.CommitTransactionAsync(cancellationToken);
await _relationalConnection.CommitTransactionAsync(cancellationToken);
if (_supportShardingTransaction != null)
await _supportShardingTransaction.CommitAsync(cancellationToken);
}
public Task RollbackTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
public async Task RollbackTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
return _relationalConnection.RollbackTransactionAsync(cancellationToken);
await _relationalConnection.RollbackTransactionAsync(cancellationToken);
if (_supportShardingTransaction != null)
await _supportShardingTransaction.RollbackAsync(cancellationToken);
}
#endif
@ -139,19 +155,23 @@ namespace ShardingCore.EFCores
public IDbContextTransaction BeginTransaction(IsolationLevel isolationLevel)
{
return _relationalConnection.BeginTransaction(isolationLevel);
var dbContextTransaction = _relationalConnection.BeginTransaction(isolationLevel);
_supportShardingTransaction?.BeginTransaction(isolationLevel);
return dbContextTransaction;
}
public Task<IDbContextTransaction> BeginTransactionAsync(IsolationLevel isolationLevel,
public async Task<IDbContextTransaction> BeginTransactionAsync(IsolationLevel isolationLevel,
CancellationToken cancellationToken = new CancellationToken())
{
return _relationalConnection.BeginTransactionAsync(isolationLevel, cancellationToken);
var dbContextTransaction = await _relationalConnection.BeginTransactionAsync(isolationLevel, cancellationToken);
_supportShardingTransaction?.BeginTransaction(isolationLevel);
return dbContextTransaction;
}
public IDbContextTransaction UseTransaction(DbTransaction transaction)
{
var dbContextTransaction = _relationalConnection.UseTransaction(transaction);
((IShardingTransaction)Context).UseShardingTransaction(transaction);
_supportShardingTransaction?.UseTransaction(transaction);
return dbContextTransaction;
}

View File

@ -1,44 +0,0 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.EFCores
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/5 20:37:36
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
//public class ShardingRelationalTransaction: RelationalTransaction
//{
// private readonly IShardingDbContext _shardingDbContext;
// public ShardingRelationalTransaction(IRelationalConnection connection, DbTransaction transaction, Guid transactionId, IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger, bool transactionOwned) : base(connection, transaction, transactionId, logger, transactionOwned)
// {
// _shardingDbContext = (IShardingDbContext)null;
// _shardingDbContext.UseShardingTransaction(transaction);
// }
// protected override void ClearTransaction()
// {
// base.ClearTransaction();
// _shardingDbContext.UseShardingTransaction(null);
// }
// protected override async Task ClearTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
// {
// await base.ClearTransactionAsync(cancellationToken);
// _shardingDbContext.UseShardingTransaction(null);
// }
//}
}

View File

@ -78,7 +78,7 @@ namespace ShardingCore.Extensions
// /// <returns></returns>
// public static List<VirtualTableDbContextConfig> GetVirtualTableDbContextConfigs(this List<IVirtualTable> virtualTables)
// {
// return virtualTables.Select(o => new VirtualTableDbContextConfig(o.EntityType, o.GetOriginalTableName(), o.ShardingConfigOption.TailPrefix)).ToList();
// return virtualTables.Select(o => new VirtualTableDbContextConfig(o.EntityType, o.GetVirtualTableName(), o.ShardingConfigOption.TailPrefix)).ToList();
// }
/// <summary>
/// 是否是集合contains方法

View File

@ -84,5 +84,16 @@ namespace ShardingCore.Extensions
return property != null;
}
public static Type GetGenericType0(this Type genericType,Type arg0Type)
{
return genericType.MakeGenericType(arg0Type);
}
public static Type GetGenericType1(this Type genericType,Type arg0Type, Type arg1Type)
{
return genericType.MakeGenericType(arg0Type, arg1Type);
}
}
}

View File

@ -7,6 +7,7 @@ using System.Text;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Core;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Core.VirtualTables;

View File

@ -5,7 +5,7 @@ using System.Text;
using ShardingCore.Core;
using ShardingCore.Core.PhysicTables;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Core.VirtualTables;
@ -70,12 +70,12 @@ namespace ShardingCore.Extensions
public static string GetTableTail<TEntity>(this IVirtualTableManager virtualTableManager, Type shardingDbContextType,string dsname,
public static string GetTableTail<TEntity>(this IVirtualTableManager virtualTableManager,
TEntity entity) where TEntity : class
{
if (entity.IsShardingTable())
return string.Empty;
var physicTable = virtualTableManager.GetVirtualTable(shardingDbContextType,dsname, entity.GetType()).RouteTo(new ShardingTableRouteConfig(null, entity as IShardingTable, null))[0];
var physicTable = virtualTableManager.GetVirtualTable(entity.GetType()).RouteTo(new ShardingTableRouteConfig(null, entity as IShardingTable, null))[0];
return physicTable.Tail;
}
}

View File

@ -7,7 +7,6 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualDataSources;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Sharding.Abstractions;
@ -22,31 +21,21 @@ namespace ShardingCore.Extensions
*/
public static class VirtualDataSourceManagerExtension
{
public static IVirtualDataSource<TEntity> GetVirtualDataSource<TShardingDbContext,TEntity>(this IVirtualDataSourceManager virtualDataSourceManager)
where TShardingDbContext:DbContext,IShardingDbContext
where TEntity : class,IShardingDataSource
public static string GetDataSourceName<TEntity>(this IVirtualDataSourceManager virtualDataSourceManager,TEntity entity) where TEntity : class
{
return (IVirtualDataSource<TEntity>)virtualDataSourceManager.GetVirtualDataSource(typeof(TShardingDbContext), typeof(TEntity));
}
public static string GetDataSourceName<TEntity>(this IVirtualDataSourceManager virtualDataSourceManager, Type shardingDbContextType, TEntity entity) where TEntity : class
{
return virtualDataSourceManager.GetPhysicDataSource(shardingDbContextType, entity).DSName;
}
public static IPhysicDataSource GetPhysicDataSource<TEntity>(this IVirtualDataSourceManager virtualDataSourceManager, Type shardingDbContextType, TEntity entity) where TEntity : class
{
var type = entity.GetType();
var virtualDataSource = virtualDataSourceManager.GetVirtualDataSource();
if (!entity.IsShardingDataSource())
return virtualDataSourceManager.GetDefaultDataSource(shardingDbContextType);
var virtualDataSource = virtualDataSourceManager.GetVirtualDataSource(type);
return virtualDataSource.RouteTo(
return virtualDataSource.DefaultDataSourceName;
return virtualDataSource.RouteTo(typeof(TEntity),
new ShardingDataSourceRouteConfig(shardingDataSource: entity as IShardingDataSource))[0];
}
public static List<string> GetDataSourceNames<TEntity>(this IVirtualDataSourceManager virtualDataSourceManager, Type shardingDbContextType, Expression<Func<TEntity, bool>> where)
public static List<string> GetDataSourceNames<TEntity>(this IVirtualDataSourceManager virtualDataSourceManager,Expression<Func<TEntity, bool>> where)
where TEntity : class
{
var virtualDataSource = virtualDataSourceManager.GetVirtualDataSource(shardingDbContextType, typeof(TEntity));
return virtualDataSource.RouteTo(new ShardingDataSourceRouteConfig(predicate: where)).Select(o => o.DSName)
var virtualDataSource = virtualDataSourceManager.GetVirtualDataSource();
return virtualDataSource.RouteTo(typeof(TEntity),new ShardingDataSourceRouteConfig(predicate: where))
.ToList();
}
}

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Data.Common;
using System.Text;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore
{
@ -15,8 +16,10 @@ namespace ShardingCore
*/
public interface IShardingDbContextOptionsBuilderConfig
{
Type ShardingDbContextType { get; }
DbContextOptionsBuilder UseDbContextOptionsBuilder(DbConnection dbConnection, DbContextOptionsBuilder dbContextOptionsBuilder);
DbContextOptionsBuilder UseDbContextOptionsBuilder(string connectionString,DbContextOptionsBuilder dbContextOptionsBuilder);
DbContextOptionsBuilder UseDbContextOptionsBuilder(string connectionString, DbContextOptionsBuilder dbContextOptionsBuilder);
}
public interface IShardingDbContextOptionsBuilderConfig<TShardingDbContext> : IShardingDbContextOptionsBuilderConfig where TShardingDbContext:DbContext,IShardingDbContext
{
}
}

View File

@ -20,8 +20,11 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding.ShardingTransactions;
namespace ShardingCore.Sharding
{
@ -34,204 +37,63 @@ namespace ShardingCore.Sharding
/// <summary>
/// 分表分库的dbcontext
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class AbstractShardingDbContext<T> : DbContext, IShardingDbContext<T>, IShardingTransaction, IShardingReadWriteSupport where T : DbContext, IShardingTableDbContext
/// <typeparam name="TDbContext"></typeparam>
public abstract class AbstractShardingDbContext<TDbContext> : DbContext, IShardingDbContext<TDbContext>, ISupportShardingTransaction, ISupportShardingReadWrite where TDbContext : DbContext
{
//private readonly ConcurrentDictionary<string, DbContext> _dbContextCaches = new ConcurrentDictionary<string, DbContext>();
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string,DbContext>> _dbContextCaches = new ConcurrentDictionary<string, ConcurrentDictionary<string, DbContext>>();
private readonly IShardingConfigOption shardingConfigOption;
private readonly IVirtualDataSourceManager _virtualDataSourceManager;
private readonly IVirtualTableManager _virtualTableManager;
private readonly IRouteTailFactory _routeTailFactory;
private readonly IShardingDbContextFactory _shardingDbContextFactory;
private readonly IShardingDbContextOptionsBuilderConfig _shardingDbContextOptionsBuilderConfig;
private readonly IReadWriteOptions _readWriteOptions;
private readonly IConnectionStringManager _connectionStringManager;
private readonly IShardingDbContextExecutor _shardingDbContextExecutor;
public AbstractShardingDbContext(DbContextOptions options) : base(options)
{
_shardingDbContextFactory = ShardingContainer.GetService<IShardingDbContextFactory>();
_virtualDataSourceManager = ShardingContainer.GetService<IVirtualDataSourceManager>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager>();
_routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
_shardingDbContextOptionsBuilderConfig = ShardingContainer
.GetService<IEnumerable<IShardingDbContextOptionsBuilderConfig>>()
.FirstOrDefault(o => o.ShardingDbContextType == ShardingDbContextType) ?? throw new ArgumentNullException(nameof(IShardingDbContextOptionsBuilderConfig));
ShardingDbContextType = this.GetType();
ActualDbContextType = typeof(TDbContext);
_connectionStringManager = ShardingContainer.GetService<IEnumerable<IConnectionStringManager>>()
.FirstOrDefault(o => o.ShardingDbContextType == ShardingDbContextType) ?? throw new ArgumentNullException(nameof(IConnectionStringManager));
_shardingDbContextExecutor =
(IShardingDbContextExecutor)Activator.CreateInstance(
typeof(ShardingDbContextExecutor<,>).GetGenericType1(ShardingDbContextType, ActualDbContextType));
shardingConfigOption = ShardingContainer.GetService<IEnumerable<IShardingConfigOption>>().FirstOrDefault(o => o.ShardingDbContextType == ShardingDbContextType && o.ActualDbContextType == typeof(T)) ?? throw new ArgumentNullException(nameof(IShardingConfigOption));
if (shardingConfigOption.UseReadWrite)
{
_readWriteOptions = ShardingContainer
.GetService<IEnumerable<IReadWriteOptions>>()
.FirstOrDefault(o => o.ShardingDbContextType == ShardingDbContextType) ?? throw new ArgumentNullException(nameof(IReadWriteOptions));
ReadWriteSupport = _readWriteOptions.ReadWriteSupport;
ReadWritePriority = _readWriteOptions.ReadWritePriority;
}
}
public abstract Type ShardingDbContextType { get; }
public Type ActualDbContextType => typeof(T);
//private ShardingDatabaseFacade _database;
//public override DatabaseFacade Database
//{
// get
// {
// return _database ?? (_database = new ShardingDatabaseFacade(this));
// }
//}
public int ReadWritePriority { get; set; }
public bool ReadWriteSupport { get; set; }
public ReadConnStringGetStrategyEnum GetReadConnStringGetStrategy()
public Type ShardingDbContextType { get; }
public Type ActualDbContextType { get; }
/// <summary>
/// 读写分离优先级
/// </summary>
public int ReadWriteSeparationPriority
{
return _readWriteOptions.ReadConnStringGetStrategy;
get => _shardingDbContextExecutor.ReadWriteSeparationPriority;
set => _shardingDbContextExecutor.ReadWriteSeparationPriority = value;
}
public string GetWriteConnectionString(string dsName)
/// <summary>
/// 是否使用读写分离
/// </summary>
public bool ReadWriteSeparation
{
return GetConnectionString(dsName);
get => _shardingDbContextExecutor.ReadWriteSeparation;
set => _shardingDbContextExecutor.ReadWriteSeparation = value;
}
public string GetConnectionString(string dsName)
public DbContext GetDbContext(string dataSourceName, bool parallelQuery, IRouteTail routeTail)
{
return _virtualDataSourceManager.GetDataSource(dsName).ConnectionString;
return _shardingDbContextExecutor.CreateDbContext(parallelQuery, dataSourceName, routeTail);
}
private DbContextOptionsBuilder<T> CreateDbContextOptionBuilder()
{
Type type = typeof(DbContextOptionsBuilder<>);
type = type.MakeGenericType(ActualDbContextType);
return (DbContextOptionsBuilder<T>)Activator.CreateInstance(type);
}
private DbContextOptions<T> CreateShareDbContextOptions(string dsName)
{
var dbContextOptionBuilder = CreateDbContextOptionBuilder();
if (!_dbContextCaches.TryGetValue(dsName, out var sameConnectionDbContexts))
{
sameConnectionDbContexts = new ConcurrentDictionary<string, DbContext>();
_dbContextCaches.TryAdd(dsName, sameConnectionDbContexts);
}
//存在使用相同的connection创建 第一次使用字符串创建
if (sameConnectionDbContexts.Any())
{
var dbConnection = sameConnectionDbContexts.First().Value.Database.GetDbConnection();
_shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(dbConnection, dbContextOptionBuilder);
}
else
{
var physicDataSource = _virtualDataSourceManager.GetDataSource(dsName);
_shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(physicDataSource.ConnectionString, dbContextOptionBuilder);
}
return dbContextOptionBuilder.Options;
}
private ShardingDbContextOptions GetShareShardingDbContextOptions(string dsName,IRouteTail routeTail)
{
var dbContextOptions=CreateShareDbContextOptions(dsName);
return new ShardingDbContextOptions(dbContextOptions, routeTail);
}
private DbContextOptions<T> CreateParallelDbContextOptions(string dsName)
{
var dbContextOptionBuilder = CreateDbContextOptionBuilder();
var connectionString = _connectionStringManager.GetConnectionString(dsName,this);
_shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(connectionString, dbContextOptionBuilder);
return dbContextOptionBuilder.Options;
}
private ShardingDbContextOptions CetParallelShardingDbContextOptions(string dsName, IRouteTail routeTail)
{
return new ShardingDbContextOptions(CreateParallelDbContextOptions(dsName), routeTail);
}
public DbContext GetDbContext(string dsName,bool track, IRouteTail routeTail)
{
if (track)
{
if (routeTail.IsMultiEntityQuery())
throw new ShardingCoreNotSupportedException("multi route not support track");
if (!(routeTail is ISingleQueryRouteTail singleQueryRouteTail))
throw new ShardingCoreNotSupportedException("multi route not support track");
if (!_dbContextCaches.TryGetValue(dsName, out var tailDbContexts))
{
tailDbContexts = new ConcurrentDictionary<string, DbContext>();
_dbContextCaches.TryAdd(dsName, tailDbContexts);
}
var cacheKey = routeTail.GetRouteTailIdentity();
if (!tailDbContexts.TryGetValue(cacheKey, out var dbContext))
{
dbContext = _shardingDbContextFactory.Create(ShardingDbContextType, GetShareShardingDbContextOptions(dsName,routeTail));
if (IsBeginTransaction)
dbContext.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction());
tailDbContexts.TryAdd(cacheKey, dbContext);
}
return dbContext;
}
else
{
return _shardingDbContextFactory.Create(ShardingDbContextType, CetParallelShardingDbContextOptions(dsName,routeTail));
}
}
public bool IsBeginTransaction => Database.CurrentTransaction != null;
/// <summary>
/// 根据对象创建通用的dbcontext
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TEntity"></typeparam>
/// <param name="entity"></param>
/// <returns></returns>
public DbContext CreateGenericDbContext<T>(T entity) where T : class
public DbContext CreateGenericDbContext<TEntity>(TEntity entity) where TEntity : class
{
var dsname = _virtualDataSourceManager.GetDataSourceName(ShardingDbContextType,entity);
var tail = _virtualTableManager.GetTableTail(ShardingDbContextType, dsname,entity);
return GetDbContext(dsname,true, _routeTailFactory.Create(tail));
return _shardingDbContextExecutor.CreateGenericDbContext(entity);
}
public IEnumerable<DbContext> CreateExpressionDbContext<TEntity>(Expression<Func<TEntity, bool>> @where) where TEntity : class
public IEnumerable<DbContext> CreateExpressionDbContext<TEntity>(Expression<Func<TEntity, bool>> where) where TEntity : class
{
var dsNames = _virtualDataSourceManager.GetDataSourceNames(ShardingDbContextType, @where);
if (typeof(TEntity).IsShardingTable())
{
var resultDbContexts = new LinkedList<DbContext>();
foreach (var dsName in dsNames)
{
var physicTable = _virtualTableManager.GetVirtualTable(ShardingDbContextType, dsName, typeof(TEntity)).RouteTo(new ShardingTableRouteConfig(predicate: @where));
if (physicTable.IsEmpty())
throw new ShardingCoreException($"{@where.ShardingPrint()} cant found ant physic table");
var dbContexts = physicTable.Select(o => GetDbContext(dsName, true, _routeTailFactory.Create(o.Tail))).ToList();
foreach (var dbContext in dbContexts)
{
resultDbContexts.AddLast(dbContext);
}
}
return resultDbContexts;
}
else
{
return dsNames.Select(dsName => GetDbContext(dsName, true, _routeTailFactory.Create(string.Empty)));
}
return _shardingDbContextExecutor.CreateExpressionDbContext(where);
}
public void UseShardingTransaction(DbTransaction transaction)
{
throw new NotImplementedException();
//_dbContextCaches.Values.ForEach(o => o.Database.UseTransaction(transaction));
}
public override EntityEntry Add(object entity)
{
@ -479,6 +341,36 @@ namespace ShardingCore.Sharding
}
}
//protected virtual void ApplyShardingConcepts()
//{
// foreach (var entry in ChangeTracker.Entries().ToList())
// {
// ApplyShardingConcepts(entry);
// }
//}
//protected virtual void ApplyShardingConcepts(EntityEntry entry)
//{
// switch (entry.State)
// {
// case EntityState.Added:
// case EntityState.Modified:
// case EntityState.Deleted:
// ApplyShardingConceptsForEntity(entry);
// break;
// }
// //throw new ShardingCoreNotSupportedException($"entry.State:[{entry.State}]");
//}
//protected virtual void ApplyShardingConceptsForEntity(EntityEntry entry)
//{
// var genericDbContext = CreateGenericDbContext(entry.Entity);
// var entityState = entry.State;
// entry.State = EntityState.Unchanged;
// genericDbContext.Entry(entry.Entity).State = entityState;
//}
public override int SaveChanges()
{
return this.SaveChanges(true);
@ -486,64 +378,46 @@ namespace ShardingCore.Sharding
public override int SaveChanges(bool acceptAllChangesOnSuccess)
{
var isBeginTransaction = IsBeginTransaction;
//ApplyShardingConcepts();
int i = 0;
//如果是内部开的事务就内部自己消化
if (!isBeginTransaction)
if (!_shardingDbContextExecutor.IsBeginTransaction)
{
using (var tran = Database.BeginTransaction())
using (var tran = _shardingDbContextExecutor.BeginTransaction())
{
foreach (var dbContextCache in _dbContextCaches)
{
i += dbContextCache.Value.SaveChanges(acceptAllChangesOnSuccess);
}
tran.Commit();
i = _shardingDbContextExecutor.SaveChanges(acceptAllChangesOnSuccess);
}
}
else
{
foreach (var dbContextCache in _dbContextCaches)
{
i += dbContextCache.Value.SaveChanges(acceptAllChangesOnSuccess);
}
i = _shardingDbContextExecutor.SaveChanges(acceptAllChangesOnSuccess);
}
return i;
}
public override Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken())
public override Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken())
{
return this.SaveChangesAsync(true,cancellationToken);
return this.SaveChangesAsync(true, cancellationToken);
}
public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
{
var isBeginTransaction = IsBeginTransaction;
//ApplyShardingConcepts();
int i = 0;
//如果是内部开的事务就内部自己消化
if (!isBeginTransaction)
if (!_shardingDbContextExecutor.IsBeginTransaction)
{
using (var tran = await Database.BeginTransactionAsync(cancellationToken))
using(var tran= _shardingDbContextExecutor.BeginTransaction())
{
foreach (var dbContextCache in _dbContextCaches)
{
i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
}
await tran.CommitAsync();
i = await _shardingDbContextExecutor.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
await tran.CommitAsync(cancellationToken);
}
}
else
{
foreach (var dbContextCache in _dbContextCaches)
{
i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
}
i = await _shardingDbContextExecutor.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
}
@ -552,36 +426,20 @@ namespace ShardingCore.Sharding
public override void Dispose()
{
foreach (var dbContextCache in _dbContextCaches)
{
try
{
dbContextCache.Value.Dispose();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
_shardingDbContextExecutor.Dispose();
base.Dispose();
}
public override async ValueTask DisposeAsync()
{
foreach (var dbContextCache in _dbContextCaches)
{
try
{
await dbContextCache.Value.DisposeAsync();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
await _shardingDbContextExecutor.DisposeAsync();
await base.DisposeAsync();
}
public IShardingTransaction BeginTransaction(IsolationLevel isolationLevel = IsolationLevel.Unspecified)
{
return _shardingDbContextExecutor.BeginTransaction(isolationLevel);
}
}
}

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Text;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
namespace ShardingCore.Sharding.Abstractions
{
@ -14,7 +15,9 @@ namespace ShardingCore.Sharding.Abstractions
*/
public interface IConnectionStringManager
{
Type ShardingDbContextType { get; }
string GetConnectionString(string dsName,IShardingDbContext shardingDbContext);
string GetConnectionString(string dataSourceName);
}
public interface IConnectionStringManager<TShardingDbContext>: IConnectionStringManager where TShardingDbContext:DbContext,IShardingDbContext
{
}
}

View File

@ -3,6 +3,7 @@ using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq.Expressions;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
namespace ShardingCore.Sharding.Abstractions
@ -26,11 +27,11 @@ namespace ShardingCore.Sharding.Abstractions
/// <summary>
/// create DbContext
/// </summary>
/// <param name="dsName">data source name</param>
/// <param name="track">true not care db context life, false need call dispose()</param>
/// <param name="dataSourceName">data source</param>
/// <param name="parallelQuery">true not care db context life, false need call dispose()</param>
/// <param name="routeTail"></param>
/// <returns></returns>
DbContext GetDbContext(string dsName,bool track,IRouteTail routeTail);
DbContext GetDbContext(string dataSourceName, bool parallelQuery, IRouteTail routeTail);
/// <summary>
/// 创建通用的db context
/// </summary>
@ -48,12 +49,11 @@ namespace ShardingCore.Sharding.Abstractions
IEnumerable<DbContext> CreateExpressionDbContext<TEntity>(Expression<Func<TEntity, bool>> where)
where TEntity : class;
string GetConnectionString(string dsName);
}
public interface IShardingDbContext<T> : IShardingDbContext where T : DbContext, IShardingTableDbContext
public interface IShardingDbContext<T> : IShardingDbContext where T : DbContext
{
}

View File

@ -0,0 +1,60 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.DbContexts;
using ShardingCore.Sharding.ShardingTransactions;
namespace ShardingCore.Sharding.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/18 9:50:11
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IShardingDbContextExecutor:IDisposable,IAsyncDisposable
{
IShardingTransaction CurrentShardingTransaction { get; }
bool IsBeginTransaction { get; }
/// <summary>
/// 读写分离优先级
/// </summary>
int ReadWriteSeparationPriority { get; set; }
/// <summary>
/// 是否开启读写分离
/// </summary>
bool ReadWriteSeparation { get; set; }
/// <summary>
/// create sharding db context options
/// </summary>
/// <param name="parallelQuery">this query has >1 connection query</param>
/// <param name="dataSourceName">data source name</param>
/// <param name="routeTail"></param>
/// <returns></returns>
DbContext CreateDbContext(bool parallelQuery, string dataSourceName, IRouteTail routeTail);
DbContext CreateGenericDbContext<TEntity>(TEntity entity) where TEntity : class;
IEnumerable<DbContext> CreateExpressionDbContext<TEntity>(Expression<Func<TEntity, bool>> where)
where TEntity : class;
IShardingTransaction BeginTransaction(IsolationLevel isolationLevel=IsolationLevel.Unspecified);
Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess,
CancellationToken cancellationToken = new CancellationToken());
int SaveChanges(bool acceptAllChangesOnSuccess);
void ClearTransaction();
Task ClearTransactionAsync(CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -1,19 +0,0 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Text;
namespace ShardingCore.Sharding.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/6 8:41:50
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IShardingTransaction
{
void UseShardingTransaction(DbTransaction transaction);
}
}

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Text;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Sharding.ReadWriteConfigurations;
namespace ShardingCore.Sharding.Abstractions
@ -12,11 +13,9 @@ namespace ShardingCore.Sharding.Abstractions
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IShardingReadWriteSupport
public interface ISupportShardingReadWrite
{
int ReadWritePriority { get; set; }
bool ReadWriteSupport { get; set; }
ReadConnStringGetStrategyEnum GetReadConnStringGetStrategy();
string GetWriteConnectionString(string dsName);
int ReadWriteSeparationPriority { get; set; }
bool ReadWriteSeparation { get; set; }
}
}

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Sharding.ShardingTransactions;
namespace ShardingCore.Sharding.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/18 13:30:08
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface ISupportShardingTransaction
{
IShardingTransaction BeginTransaction(IsolationLevel isolationLevel=IsolationLevel.Unspecified);
}
}

View File

@ -0,0 +1,92 @@
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ReadWriteConfigurations;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
namespace ShardingCore.Sharding
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/18 16:57:56
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ActualConnectionStringManager<TShardingDbContext> where TShardingDbContext : DbContext, IShardingDbContext
{
private readonly IConnectionStringManager _connectionStringManager;
private readonly IReadWriteOptions<TShardingDbContext> _readWriteOptions;
private readonly bool _useReadWriteSeparation;
private readonly IShardingReadWriteManager _shardingReadWriteManager;
private readonly IVirtualDataSource _virtualDataSource;
public int ReadWriteSeparationPriority { get; set; }
public bool ReadWriteSeparation { get; set; }
private string _cacheConnectionString;
public ActualConnectionStringManager()
{
_virtualDataSource=ShardingContainer.GetService<IVirtualDataSourceManager<TShardingDbContext>>().GetVirtualDataSource();
_connectionStringManager = ShardingContainer.GetService<IConnectionStringManager<TShardingDbContext>>();
_readWriteOptions = ShardingContainer.GetService<IReadWriteOptions<TShardingDbContext>>();
_shardingReadWriteManager = ShardingContainer.GetService<IShardingReadWriteManager>();
_useReadWriteSeparation = _connectionStringManager is ReadWriteConnectionStringManager<TShardingDbContext>;
}
public string GetConnectionString(string dataSourceName, bool isWrite)
{
if (isWrite)
return GetWriteConnectionString(dataSourceName);
if (!_useReadWriteSeparation)
{
return _connectionStringManager.GetConnectionString(dataSourceName);
}
else
{
return GetReadWriteSeparationConnectString(dataSourceName);
}
}
private string GetWriteConnectionString(string dataSourceName)
{
return _virtualDataSource.GetPhysicDataSource(dataSourceName).ConnectionString;
}
private string GetReadWriteSeparationConnectString(string dataSourceName)
{
var support = ReadWriteSeparation;
var shardingReadWriteContext = _shardingReadWriteManager.GetCurrent<TShardingDbContext>();
if (shardingReadWriteContext != null)
{
support = (ReadWriteSeparationPriority >= shardingReadWriteContext.DefaultPriority)
? ReadWriteSeparation
: shardingReadWriteContext.DefaultReadEnable;
}
if (support)
{
return GetReadWriteSeparationConnectString0(dataSourceName);
}
return GetWriteConnectionString(dataSourceName);
}
private string GetReadWriteSeparationConnectString0(string dataSourceName)
{
if (_readWriteOptions.ReadConnStringGetStrategy == ReadConnStringGetStrategyEnum.LatestFirstTime)
{
if (_cacheConnectionString == null)
_cacheConnectionString = _connectionStringManager.GetConnectionString(dataSourceName);
return _cacheConnectionString;
}
else if (_readWriteOptions.ReadConnStringGetStrategy == ReadConnStringGetStrategyEnum.LatestEveryTime)
{
return _connectionStringManager.GetConnectionString(dataSourceName);
}
else
{
throw new InvalidOperationException($"ReadWriteConnectionStringManager:{_readWriteOptions.ReadConnStringGetStrategy}");
}
}
}
}

View File

@ -1,7 +1,5 @@
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding
@ -13,13 +11,17 @@ namespace ShardingCore.Sharding
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class DefaultConnectionStringManager<TShardingDbContext>:IConnectionStringManager where TShardingDbContext:DbContext,IShardingDbContext
public class DefaultConnectionStringManager<TShardingDbContext> : IConnectionStringManager<TShardingDbContext> where TShardingDbContext : DbContext, IShardingDbContext
{
public Type ShardingDbContextType => typeof(TShardingDbContext);
private readonly IVirtualDataSource _virtualDataSource;
public string GetConnectionString(IShardingDbContext shardingDbContext)
public DefaultConnectionStringManager(IVirtualDataSourceManager<TShardingDbContext> virtualDataSourceManager)
{
return shardingDbContext.GetConnectionString();
_virtualDataSource = virtualDataSourceManager.GetVirtualDataSource();
}
public string GetConnectionString(string dataSourceName)
{
return _virtualDataSource.GetPhysicDataSource(dataSourceName).ConnectionString;
}
}
}

View File

@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ReadWriteConfigurations.Abstractions
{
@ -13,7 +15,6 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations.Abstractions
*/
public interface IReadWriteOptions
{
Type ShardingDbContextType { get; }
/// <summary>
/// 默认读写配置优先级
/// </summary>
@ -24,4 +25,10 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations.Abstractions
bool ReadWriteSupport { get; }
ReadConnStringGetStrategyEnum ReadConnStringGetStrategy { get; }
}
public interface IReadWriteOptions<TShardingDbContext> : IReadWriteOptions
where TShardingDbContext : DbContext, IShardingDbContext
{
}
}

View File

@ -15,7 +15,12 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations.Abstractions
*/
public interface IShardingConnectionStringResolver
{
Type ShardingDbContextType { get; }
string GetConnectionString();
string GetConnectionString(string dataSourceName);
}
public interface IShardingConnectionStringResolver<TShardingDbContext> : IShardingConnectionStringResolver
where TShardingDbContext : DbContext, IShardingDbContext
{
}
}

View File

@ -1,8 +1,11 @@
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ReadWriteConfigurations
{
@ -13,26 +16,26 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class LoopShardingConnectionStringResolver<TShardingDbContext> : IShardingConnectionStringResolver
public class LoopShardingConnectionStringResolver<TShardingDbContext> : IShardingConnectionStringResolver<TShardingDbContext> where TShardingDbContext : DbContext, IShardingDbContext
{
public Type ShardingDbContextType => typeof(TShardingDbContext);
private readonly string[] _connectionStrings;
private readonly int _length;
private long _seed = 0;
public LoopShardingConnectionStringResolver(IEnumerable<string> connectionStrings)
private readonly ConcurrentDictionary<string, ReadWriteLoopConnector> _connectors =
new ConcurrentDictionary<string, ReadWriteLoopConnector>();
public LoopShardingConnectionStringResolver(IEnumerable<ReadWriteLoopConnector> connectors)
{
_connectionStrings = connectionStrings.ToArray();
_length = _connectionStrings.Length;
}
public string GetConnectionString()
{
Interlocked.Increment(ref _seed);
var next = (int)(_seed % _length);
if (next < 0)
return _connectionStrings[Math.Abs(next)];
return _connectionStrings[next];
var enumerator = connectors.GetEnumerator();
while (enumerator.MoveNext())
{
var currentConnector = enumerator.Current;
if (currentConnector != null)
_connectors.TryAdd(currentConnector.DataSourceName, currentConnector);
}
}
public string GetConnectionString(string dataSourceName)
{
if (!_connectors.TryGetValue(dataSourceName, out var connector))
throw new InvalidOperationException($"read write connector not found, data source name:[{dataSourceName}]");
return connector.GetConnectionString();
}
}
}

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
@ -15,57 +16,19 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ReadWriteConnectionStringManager<TShardingDbContext> : IConnectionStringManager where TShardingDbContext : DbContext, IShardingDbContext
public class ReadWriteConnectionStringManager<TShardingDbContext> : IConnectionStringManager<TShardingDbContext> where TShardingDbContext : DbContext, IShardingDbContext
{
private readonly IShardingReadWriteManager _shardingReadWriteManager;
public Type ShardingDbContextType => typeof(TShardingDbContext);
private IShardingConnectionStringResolver _shardingConnectionStringResolver;
private string _cacheConnectionString;
public ReadWriteConnectionStringManager(IShardingReadWriteManager shardingReadWriteManager, IEnumerable<IShardingConnectionStringResolver> shardingConnectionStringResolvers)
public ReadWriteConnectionStringManager(IShardingConnectionStringResolver<TShardingDbContext> shardingConnectionStringResolver)
{
_shardingReadWriteManager = shardingReadWriteManager;
_shardingConnectionStringResolver = shardingConnectionStringResolvers.FirstOrDefault(o => o.ShardingDbContextType == ShardingDbContextType) ?? throw new ArgumentNullException($"{ShardingDbContextType.FullName}:{nameof(shardingConnectionStringResolvers)}");
_shardingConnectionStringResolver = shardingConnectionStringResolver;
}
public string GetConnectionString(string dsName,IShardingDbContext shardingDbContext)
public string GetConnectionString(string dataSourceName)
{
if (!(shardingDbContext is IShardingReadWriteSupport shardingReadWriteSupport))
{
return shardingDbContext.GetConnectionString(dsName);
}
var shardingReadWriteContext = _shardingReadWriteManager.GetCurrent(ShardingDbContextType);
var support = shardingReadWriteSupport.ReadWriteSupport;
if (shardingReadWriteContext != null)
{
support = (shardingReadWriteSupport.ReadWritePriority >= shardingReadWriteContext.DefaultPriority)
? shardingReadWriteSupport.ReadWriteSupport
: shardingReadWriteContext.DefaultReadEnable;
}
if (support)
{
return GetReadConnectionString0(shardingReadWriteSupport);
}
return shardingReadWriteSupport.GetWriteConnectionString(dsName);
}
private string GetReadConnectionString0(IShardingReadWriteSupport shardingReadWriteSupport)
{
var readConnStringGetStrategy = shardingReadWriteSupport.GetReadConnStringGetStrategy();
if (readConnStringGetStrategy == ReadConnStringGetStrategyEnum.LatestFirstTime)
{
if (_cacheConnectionString == null)
_cacheConnectionString = _shardingConnectionStringResolver.GetConnectionString();
return _cacheConnectionString;
}
else if (readConnStringGetStrategy == ReadConnStringGetStrategyEnum.LatestEveryTime)
{
return _shardingConnectionStringResolver.GetConnectionString();
}
else
{
throw new InvalidOperationException($"ReadWriteConnectionStringManager:{readConnStringGetStrategy}");
}
return _shardingConnectionStringResolver.GetConnectionString(dataSourceName);
}
}
}

View File

@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ShardingCore.Sharding.ReadWriteConfigurations
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/18 17:27:44
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ReadWriteLoopConnector
{
private readonly string[] _connectionStrings;
private readonly int _length;
private long _seed = 0;
public ReadWriteLoopConnector(string dataSourceName, IEnumerable<string> connectionStrings)
{
DataSourceName = dataSourceName;
_connectionStrings = connectionStrings.ToArray();
_length = _connectionStrings.Length;
}
public string DataSourceName { get; }
public string GetConnectionString()
{
Interlocked.Increment(ref _seed);
var next = (int)(_seed % _length);
if (next < 0)
return _connectionStrings[Math.Abs(next)];
return _connectionStrings[next];
}
}
}

View File

@ -0,0 +1,293 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.DbContexts;
using ShardingCore.DbContexts.ShardingDbContexts;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ReadWriteConfigurations;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using ShardingCore.Sharding.ShardingTransactions;
namespace ShardingCore.Sharding
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/18 9:55:43
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
/// <summary>
/// DbContext执行者
/// </summary>
/// <typeparam name="TShardingDbContext"></typeparam>
public class ShardingDbContextExecutor<TShardingDbContext, TActualDbContext> : IShardingDbContextExecutor where TShardingDbContext : DbContext, IShardingDbContext where TActualDbContext : DbContext
{
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, DbContext>> _dbContextCaches = new ConcurrentDictionary<string, ConcurrentDictionary<string, DbContext>>();
public IShardingTransaction CurrentShardingTransaction { get; private set; }
private readonly IVirtualDataSourceManager _virtualDataSourceManager;
private readonly IVirtualTableManager _virtualTableManager;
private readonly IShardingDbContextFactory _shardingDbContextFactory;
private readonly IShardingDbContextOptionsBuilderConfig _shardingDbContextOptionsBuilderConfig;
private readonly IRouteTailFactory _routeTailFactory;
public int ReadWriteSeparationPriority
{
get => _actualConnectionStringManager.ReadWriteSeparationPriority;
set => _actualConnectionStringManager.ReadWriteSeparationPriority = value;
}
public bool ReadWriteSeparation
{
get => _actualConnectionStringManager.ReadWriteSeparation;
set => _actualConnectionStringManager.ReadWriteSeparation = value;
}
public bool IsBeginTransaction => CurrentShardingTransaction != null && CurrentShardingTransaction.IsBeginTransaction();
private readonly ActualConnectionStringManager<TShardingDbContext> _actualConnectionStringManager;
public ShardingDbContextExecutor()
{
_virtualDataSourceManager = ShardingContainer.GetService<IVirtualDataSourceManager<TShardingDbContext>>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager<TShardingDbContext>>();
_shardingDbContextFactory = ShardingContainer.GetService<IShardingDbContextFactory<TShardingDbContext>>();
_shardingDbContextOptionsBuilderConfig = ShardingContainer.GetService<IShardingDbContextOptionsBuilderConfig<TShardingDbContext>>();
_routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
_actualConnectionStringManager = new ActualConnectionStringManager<TShardingDbContext>();
}
#region create db context
private DbContextOptionsBuilder<TActualDbContext> CreateDbContextOptionBuilder()
{
Type type = typeof(DbContextOptionsBuilder<>);
type = type.MakeGenericType(typeof(TActualDbContext));
return (DbContextOptionsBuilder<TActualDbContext>)Activator.CreateInstance(type);
}
private DbContextOptions<TActualDbContext> CreateShareDbContextOptions(string dataSourceName)
{
var dbContextOptionBuilder = CreateDbContextOptionBuilder();
if (!_dbContextCaches.TryGetValue(dataSourceName, out var sameConnectionDbContexts))
{
sameConnectionDbContexts = new ConcurrentDictionary<string, DbContext>();
_dbContextCaches.TryAdd(dataSourceName, sameConnectionDbContexts);
}
//存在使用相同的connection创建 第一次使用字符串创建
if (sameConnectionDbContexts.Any())
{
var dbConnection = sameConnectionDbContexts.First().Value.Database.GetDbConnection();
_shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(dbConnection, dbContextOptionBuilder);
}
else
{
var connectionString = _actualConnectionStringManager.GetConnectionString(dataSourceName, true);
_shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(connectionString, dbContextOptionBuilder);
}
return dbContextOptionBuilder.Options;
}
private ShardingDbContextOptions GetShareShardingDbContextOptions(string dataSourceName, IRouteTail routeTail)
{
var dbContextOptions = CreateShareDbContextOptions(dataSourceName);
return new ShardingDbContextOptions(dbContextOptions, routeTail);
}
private DbContextOptions<TActualDbContext> CreateParallelDbContextOptions(string dataSourceName)
{
var dbContextOptionBuilder = CreateDbContextOptionBuilder();
var connectionString = _actualConnectionStringManager.GetConnectionString(dataSourceName, false);
_shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(connectionString, dbContextOptionBuilder);
return dbContextOptionBuilder.Options;
}
private ShardingDbContextOptions GetParallelShardingDbContextOptions(string dataSourceName, IRouteTail routeTail)
{
return new ShardingDbContextOptions(CreateParallelDbContextOptions(dataSourceName), routeTail);
}
public DbContext CreateDbContext(bool parallelQuery, string dataSourceName, IRouteTail routeTail)
{
if (parallelQuery)
{
if (routeTail.IsMultiEntityQuery())
throw new ShardingCoreNotSupportedException("multi route not support track");
if (!(routeTail is ISingleQueryRouteTail singleQueryRouteTail))
throw new ShardingCoreNotSupportedException("multi route not support track");
if (!_dbContextCaches.TryGetValue(dataSourceName, out var tailDbContexts))
{
tailDbContexts = new ConcurrentDictionary<string, DbContext>();
_dbContextCaches.TryAdd(dataSourceName, tailDbContexts);
}
var cacheKey = routeTail.GetRouteTailIdentity();
if (!tailDbContexts.TryGetValue(cacheKey, out var dbContext))
{
dbContext = _shardingDbContextFactory.Create(GetShareShardingDbContextOptions(dataSourceName, routeTail));
if (IsBeginTransaction)
{
CurrentShardingTransaction.Use(dataSourceName, dbContext);
}
tailDbContexts.TryAdd(cacheKey, dbContext);
}
return dbContext;
}
else
{
return _shardingDbContextFactory.Create(GetParallelShardingDbContextOptions(dataSourceName, routeTail));
}
}
public DbContext CreateGenericDbContext<TEntity>(TEntity entity) where TEntity : class
{
var dataSourceName = _virtualDataSourceManager.GetDataSourceName(entity);
var tail = _virtualTableManager.GetTableTail(entity);
return CreateDbContext(true, dataSourceName, _routeTailFactory.Create(tail));
}
public IEnumerable<DbContext> CreateExpressionDbContext<TEntity>(Expression<Func<TEntity, bool>> @where) where TEntity : class
{
var dataSourceNames = _virtualDataSourceManager.GetDataSourceNames(where);
if (typeof(TEntity).IsShardingTable())
{
var resultDbContexts = new LinkedList<DbContext>();
foreach (var dataSourceName in dataSourceNames)
{
var physicTables = _virtualTableManager.GetVirtualTable(typeof(TEntity)).RouteTo(new ShardingTableRouteConfig(predicate: where));
if (physicTables.IsEmpty())
throw new ShardingCoreException($"{where.ShardingPrint()} cant found ant physic table");
var dbContexts = physicTables.Select(o => CreateDbContext(true, dataSourceName, _routeTailFactory.Create(o.Tail))).ToList();
foreach (var dbContext in dbContexts)
{
resultDbContexts.AddLast(dbContext);
}
}
return resultDbContexts;
}
else
{
return dataSourceNames.Select(dataSourceName => CreateDbContext(true, dataSourceName, _routeTailFactory.Create(string.Empty)));
}
}
#endregion
#region transaction
public IShardingTransaction BeginTransaction(IsolationLevel isolationLevel = IsolationLevel.Unspecified)
{
if (CurrentShardingTransaction != null)
throw new InvalidOperationException("transaction already begin");
CurrentShardingTransaction = new ShardingTransaction(this);
CurrentShardingTransaction.BeginTransaction(isolationLevel);
foreach (var dbContextCache in _dbContextCaches)
{
foreach (var keyValuePair in dbContextCache.Value)
{
CurrentShardingTransaction.Use(dbContextCache.Key, keyValuePair.Value);
}
}
return CurrentShardingTransaction;
}
public void ClearTransaction()
{
foreach (var dbContextCache in _dbContextCaches)
{
foreach (var keyValuePair in dbContextCache.Value)
{
keyValuePair.Value.Database.UseTransaction(null);
}
}
this.CurrentShardingTransaction = null;
}
public async Task ClearTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
{
foreach (var dbContextCache in _dbContextCaches)
{
foreach (var keyValuePair in dbContextCache.Value)
{
await keyValuePair.Value.Database.UseTransactionAsync(null, cancellationToken);
}
}
this.CurrentShardingTransaction = null;
}
public async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
{
int i = 0;
foreach (var dbContextCache in _dbContextCaches)
{
foreach (var tailDbContexts in dbContextCache.Value)
{
i += await tailDbContexts.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
}
}
return i;
}
public int SaveChanges(bool acceptAllChangesOnSuccess)
{
int i = 0;
foreach (var dbContextCache in _dbContextCaches)
{
foreach (var tailDbContexts in dbContextCache.Value)
{
i += tailDbContexts.Value.SaveChanges(acceptAllChangesOnSuccess);
}
}
return i;
}
#endregion
public void Dispose()
{
foreach (var dbContextCache in _dbContextCaches)
{
foreach (var tailDbContexts in dbContextCache.Value)
{
tailDbContexts.Value.Dispose();
}
}
}
public async ValueTask DisposeAsync()
{
foreach (var dbContextCache in _dbContextCaches)
{
foreach (var tailDbContexts in dbContextCache.Value)
{
await tailDbContexts.Value.DisposeAsync();
}
}
}
}
}

View File

@ -6,6 +6,7 @@ using System.Text;
using System.Threading;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;

View File

@ -0,0 +1,26 @@
using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
namespace ShardingCore.Sharding.ShardingTransactions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/6 8:41:50
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IShardingTransaction:IDisposable,IAsyncDisposable
{
bool IsBeginTransaction();
void BeginTransaction(IsolationLevel isolationLevel = IsolationLevel.Unspecified);
void Use(string dataSourceName,DbContext dbContext);
void Rollback();
Task RollbackAsync(CancellationToken cancellationToken=new CancellationToken());
void Commit();
Task CommitAsync(CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -0,0 +1,168 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ShardingTransactions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/18 13:02:32
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingTransaction: IShardingTransaction
{
private readonly IShardingDbContextExecutor _shardingDbContextExecutor;
private readonly ConcurrentDictionary<string, IDbContextTransaction> _dbContextTransactions =
new ConcurrentDictionary<string, IDbContextTransaction>();
private IsolationLevel isolationLevel = IsolationLevel.Unspecified;
private bool _isBeginTransaction=false;
public ShardingTransaction(IShardingDbContextExecutor shardingDbContextExecutor)
{
_shardingDbContextExecutor = shardingDbContextExecutor;
}
public bool IsBeginTransaction()
{
return _isBeginTransaction;
}
public void BeginTransaction(IsolationLevel isolationLevel)
{
if (_isBeginTransaction)
throw new InvalidOperationException("transaction is already begin");
_isBeginTransaction = true;
this.isolationLevel = isolationLevel;
}
public void Use(string dataSourceName, DbContext dbContext)
{
if (!_isBeginTransaction)
throw new InvalidOperationException("transaction is not begin");
if (!_dbContextTransactions.TryGetValue(dataSourceName, out var dbContextTransaction))
{
dbContextTransaction = dbContext.Database.BeginTransaction(isolationLevel);
var tryAdd = _dbContextTransactions.TryAdd(dataSourceName, dbContextTransaction);
if (!tryAdd)
throw new InvalidOperationException("append transaction error");
}
else
{
dbContext.Database.UseTransaction(dbContextTransaction.GetDbTransaction());
}
}
public void Rollback()
{
foreach (var dbContextTransaction in _dbContextTransactions)
{
try
{
dbContextTransaction.Value.Rollback();
}
catch (Exception e)
{
Console.WriteLine($"rollback error:[{e}]");
}
}
this._shardingDbContextExecutor.ClearTransaction();
}
public async Task RollbackAsync(CancellationToken cancellationToken = new CancellationToken())
{
foreach (var dbContextTransaction in _dbContextTransactions)
{
try
{
await dbContextTransaction.Value.RollbackAsync(cancellationToken);
}
catch (Exception e)
{
Console.WriteLine($"rollback error:[{e}]");
}
}
await this._shardingDbContextExecutor.ClearTransactionAsync(cancellationToken);
}
public void Commit()
{
foreach (var dbContextTransaction in _dbContextTransactions)
{
try
{
dbContextTransaction.Value.Commit();
}
catch (Exception e)
{
Console.WriteLine($"commit error:[{e}]");
}
}
this._shardingDbContextExecutor.ClearTransaction();
}
public async Task CommitAsync(CancellationToken cancellationToken = new CancellationToken())
{
foreach (var dbContextTransaction in _dbContextTransactions)
{
try
{
await dbContextTransaction.Value.CommitAsync(cancellationToken);
}
catch (Exception e)
{
Console.WriteLine($"commit error:[{e}]");
}
}
await this._shardingDbContextExecutor.ClearTransactionAsync(cancellationToken);
}
public void Dispose()
{
foreach (var dbContextTransaction in _dbContextTransactions)
{
try
{
dbContextTransaction.Value.Dispose();
}
catch (Exception e)
{
Console.WriteLine($"dispose error:[{e}]");
}
}
_dbContextTransactions.Clear();
}
public async ValueTask DisposeAsync()
{
foreach (var dbContextTransaction in _dbContextTransactions)
{
try
{
await dbContextTransaction.Value.DisposeAsync();
}
catch (Exception e)
{
Console.WriteLine($"dispose error:[{e}]");
}
}
_dbContextTransactions.Clear();
}
}
}

View File

@ -37,7 +37,7 @@ namespace ShardingCore.Sharding
public SelectContext SelectContext { get;}
public GroupByContext GroupByContext { get; }
//public IEnumerable<TableRouteResult> RouteResults { get; }
public IEnumerable<TableRouteResult> TableRouteResults { get; }
public DataSourceRouteResult DataSourceRouteResult { get; }
public StreamMergeContext(IQueryable<T> source,IShardingDbContext shardingDbContext, IDataSourceRouteRuleEngineFactory dataSourceRouteRuleEngineFactory, ITableRouteRuleEngineFactory tableTableRouteRuleEngineFactory, IRouteTailFactory routeTailFactory)
@ -57,6 +57,7 @@ namespace ShardingCore.Sharding
_reWriteSource = reWriteResult.ReWriteQueryable;
DataSourceRouteResult =
dataSourceRouteRuleEngineFactory.Route(_shardingDbContext.ShardingDbContextType, _source);
TableRouteResults= _tableTableRouteRuleEngineFactory.Route(_shardingDbContext.ShardingDbContextType, _source);
//RouteResults = _tableTableRouteRuleEngineFactory.Route(_shardingDbContext.ShardingDbContextType, _source);
}
//public StreamMergeContext(IQueryable<T> source,IEnumerable<TableRouteResult> routeResults,
@ -86,13 +87,13 @@ namespace ShardingCore.Sharding
/// <summary>
/// ´´½¨¶ÔÓ¦µÄdbcontext
/// </summary>
/// <param name="dsname">data source name</param>
/// <param name="dataSourceName">data source name</param>
/// <param name="tableRouteResult"></param>
/// <returns></returns>
public DbContext CreateDbContext(string dsname,TableRouteResult tableRouteResult)
public DbContext CreateDbContext(string dataSourceName, TableRouteResult tableRouteResult)
{
var routeTail = _routeTailFactory.Create(tableRouteResult);
return _shardingDbContext.GetDbContext(dsname,false, routeTail);
return _shardingDbContext.GetDbContext(dataSourceName, false, routeTail);
}
public IRouteTail Create(TableRouteResult tableRouteResult)
@ -134,10 +135,5 @@ namespace ShardingCore.Sharding
{
return _shardingDbContext;
}
public IEnumerable<TableRouteResult> GetTableRouteResults(string dsname)
{
return _tableTableRouteRuleEngineFactory.Route(_shardingDbContext.ShardingDbContextType, dsname, _source);
}
}
}

View File

@ -83,19 +83,17 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
{
var dataSourceRouteResult = _mergeContext.DataSourceRouteResult;
var enumeratorTasks = dataSourceRouteResult.IntersectDataSources.SelectMany(physicDataSource =>
var enumeratorTasks = dataSourceRouteResult.IntersectDataSources.SelectMany(dataSourceName =>
{
var dsname = physicDataSource.DSName;
var tableRouteResults = _mergeContext.GetTableRouteResults(dsname);
return tableRouteResults.Select(routeResult =>
return _mergeContext.TableRouteResults.Select(routeResult =>
{
return Task.Run(async () =>
{
try
{
var asyncExecuteQueryable = CreateAsyncExecuteQueryable<TResult>(dsname, routeResult);
var asyncExecuteQueryable = CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult);
var queryResult = await efQuery(asyncExecuteQueryable);
return new RouteQueryResult<TResult>(dsname, routeResult, queryResult);
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
//}
}
catch (Exception e)

View File

@ -27,7 +27,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
var dataSourceRouteResult = StreamMergeContext.DataSourceRouteResult;
var enumeratorTasks = dataSourceRouteResult.IntersectDataSources.SelectMany(physicDataSource =>
{
var dsname = physicDataSource.DSName;
var dsname = physicDataSource.DataSourceName;
var tableRouteResults = StreamMergeContext.GetTableRouteResults(dsname);
return tableRouteResults.Select(routeResult =>
{

View File

@ -44,7 +44,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
var dataSourceRouteResult = StreamMergeContext.DataSourceRouteResult;
var enumeratorTasks = dataSourceRouteResult.IntersectDataSources.SelectMany(physicDataSource =>
{
var dsname = physicDataSource.DSName;
var dsname = physicDataSource.DataSourceName;
return StreamMergeContext.GetTableRouteResults(dsname).Select(routeResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(dsname,reverseOrderQueryable, routeResult);

View File

@ -21,8 +21,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
{
var physicDataSource = StreamMergeContext.DataSourceRouteResult.IntersectDataSources.First();
var routeResult = StreamMergeContext.GetTableRouteResults(physicDataSource.DSName).First();
var shardingDbContext = StreamMergeContext.CreateDbContext(physicDataSource.DSName,routeResult);
var routeResult = StreamMergeContext.GetTableRouteResults(physicDataSource.DataSourceName).First();
var shardingDbContext = StreamMergeContext.CreateDbContext(physicDataSource.DataSourceName,routeResult);
DbContextQueryStore.TryAdd(routeResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext);
if (async)

View File

@ -8,6 +8,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ShardingCore.Core.PhysicTables;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Core.VirtualTables;
@ -159,7 +160,7 @@ namespace ShardingCore
if (!shardingConfigOption.IgnoreCreateTableError.GetValueOrDefault())
{
_logger.LogWarning(
$"table :{virtualTable.GetOriginalTableName()}{shardingConfig.TailPrefix}{tail} will created.",e);
$"table :{virtualTable.GetVirtualTableName()}{shardingConfig.TailPrefix}{tail} will created.",e);
}
}
}

View File

@ -33,5 +33,15 @@ namespace ShardingCore
{
return Services.GetService(serviceType);
}
public static object GetGenericTypeService0(Type genericServiceType,Type genericArg0Type)
{
var serviceType = genericServiceType.MakeGenericType(genericArg0Type);
return GetService(serviceType);
}
public static object GetGenericTypeService1(Type genericServiceType, Type genericArg0Type, Type genericArg1Type)
{
var serviceType = genericServiceType.MakeGenericType(genericArg0Type, genericArg1Type);
return GetService(serviceType);
}
}
}

View File

@ -12,6 +12,7 @@ using ShardingCore.Sharding.Abstractions;
using System;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
namespace ShardingCore.TableCreator
@ -86,7 +87,7 @@ namespace ShardingCore.TableCreator
if (!shardingConfigOptions.IgnoreCreateTableError.GetValueOrDefault())
{
_logger.LogWarning(
$"create table error maybe table:[{virtualTable.GetOriginalTableName()}{virtualTable.ShardingConfig.TailPrefix}{tail}]");
$"create table error maybe table:[{virtualTable.GetVirtualTableName()}{virtualTable.ShardingConfig.TailPrefix}{tail}]");
throw new ShardingCreateException(" create table error :", ex);
}
}