完成动态数据源,动态读写分离

This commit is contained in:
xuejiaming 2022-07-02 15:01:18 +08:00
parent d0420888bc
commit 91635d9451
6 changed files with 191 additions and 249 deletions

View File

@ -10,6 +10,7 @@ using ShardingCore.Core.UnionAllMergeShardingProviders.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualRoutes.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.DynamicDataSources;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
@ -32,6 +33,7 @@ namespace ShardingCore.Core.RuntimeContexts
IQueryTracker GetQueryTracker();
IUnionAllMergeManager GetUnionAllMergeManager();
IShardingPageManager GetShardingPageManager();
IDataSourceInitializer GetDataSourceInitializer();
void GetOrCreateShardingRuntimeModel(DbContext dbContext);

View File

@ -13,6 +13,7 @@ using ShardingCore.Core.UnionAllMergeShardingProviders.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualRoutes.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.DynamicDataSources;
using ShardingCore.Exceptions;
using ShardingCore.Logger;
using ShardingCore.Sharding.Abstractions;
@ -119,6 +120,11 @@ namespace ShardingCore.Core.RuntimeContexts
return GetRequiredService<IShardingPageManager>();
}
public IDataSourceInitializer GetDataSourceInitializer()
{
return GetRequiredService<IDataSourceInitializer>();
}
public void GetOrCreateShardingRuntimeModel(DbContext dbContext)
{
if (isInitModeled) return;

View File

@ -142,7 +142,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
/// <exception cref="ShardingCoreNotFoundException"></exception>
public IPhysicDataSource GetPhysicDataSource(string dataSourceName)
{
Check.NotNull(dataSourceName, "data source name is null,plz confirm IShardingBootstrapper.Star()");
Check.NotNull(dataSourceName, $"data source name is null,plz confirm {dataSourceName} add in virtual data source");
var dataSource = _physicDataSourcePool.TryGet(dataSourceName);
if (null == dataSource)
throw new ShardingCoreNotFoundException($"data source:[{dataSourceName}]");

View File

@ -9,8 +9,14 @@ using ShardingCore.TableCreator;
using System;
using System.Collections.Generic;
using System.Threading;
using ShardingCore.Core.DbContextCreator;
using ShardingCore.Core.ServiceProviders;
using ShardingCore.Core.VirtualRoutes.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Logger;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.DynamicDataSources
{
@ -19,6 +25,8 @@ namespace ShardingCore.DynamicDataSources
private static readonly ILogger<DataSourceInitializer> _logger =
ShardingLoggerFactory.CreateLogger<DataSourceInitializer>();
private readonly IShardingProvider _shardingProvider;
private readonly IDbContextCreator _dbContextCreator;
private readonly IShardingRouteConfigOptions _routeConfigOptions;
private readonly IVirtualDataSource _virtualDataSource;
private readonly IRouteTailFactory _routeTailFactory;
@ -27,6 +35,8 @@ namespace ShardingCore.DynamicDataSources
private readonly IShardingTableCreator _tableCreator;
public DataSourceInitializer(
IShardingProvider shardingProvider,
IDbContextCreator dbContextCreator,
IShardingRouteConfigOptions routeConfigOptions,
IVirtualDataSource virtualDataSource,
IRouteTailFactory routeTailFactory,
@ -34,6 +44,8 @@ namespace ShardingCore.DynamicDataSources
IEntityMetadataManager entityMetadataManager,
IShardingTableCreator shardingTableCreator)
{
_shardingProvider = shardingProvider;
_dbContextCreator = dbContextCreator;
_routeConfigOptions = routeConfigOptions;
_virtualDataSource = virtualDataSource;
_routeTailFactory = routeTailFactory;
@ -42,125 +54,109 @@ namespace ShardingCore.DynamicDataSources
_tableCreator = shardingTableCreator;
}
public void InitConfigure(string dataSourceName)
public void InitConfigure(string dataSourceName,bool createDatabase,bool createTable)
{
// // var createDatabase = !needCreateDatabase.HasValue || needCreateDatabase.Value;
// //
// // if ((_routeConfigOptions.EnsureCreatedWithOutShardingTable || !isOnStart)&&createDatabase)
// // EnsureCreated(virtualDataSource, context, dataSourceName);
// // else if (_routeConfigOptions.CreateDataBaseOnlyOnStart.GetValueOrDefault()&& createDatabase)
// // {
// // EnsureCreateDataBaseOnly(context, dataSourceName);
// // }
// var createDatabase = !needCreateDatabase.HasValue || needCreateDatabase.Value;
//
// // var tableEnsureManager = virtualDataSource.ConfigurationParams.TableEnsureManager;
// // ////获取数据库存在的所有的表
// // var existTables = tableEnsureManager?.GetExistTables(context, dataSourceName) ??
// // new HashSet<string>();
// var allShardingEntities = _entityMetadataManager.GetAllShardingEntities();
// foreach (var entityType in allShardingEntities)
// if ((_routeConfigOptions.EnsureCreatedWithOutShardingTable || !isOnStart)&&createDatabase)
// EnsureCreated(virtualDataSource, context, dataSourceName);
// else if (_routeConfigOptions.CreateDataBaseOnlyOnStart.GetValueOrDefault()&& createDatabase)
// {
// //如果是默认数据源
// if (_virtualDataSource.IsDefault(dataSourceName))
// {
// if (_entityMetadataManager.IsShardingTable(entityType))
// {
// var virtualTable = _virtualTableManager.GetVirtualTable(entityType);
// InitVirtualTable(virtualTable);
// }
// }
// else
// {
// //非默认数据源
// if (_entityMetadataManager.IsShardingDataSource(entityType))
// {
// var virtualDataSourceRoute = virtualDataSource.GetRoute(entityType);
// if (virtualDataSourceRoute.GetAllDataSourceNames().Contains(dataSourceName))
// {
// if (_entityMetadataManager.IsShardingTable(entityType))
// {
// var virtualTable = _virtualTableManager.GetVirtualTable(entityType);
// //创建表
// InitVirtualTable(virtualTable);
// }
// }
// }
// }
//
// }
}
using (var shardingScope = _shardingProvider.CreateScope())
{
using (var shellDbContext = _dbContextCreator.GetShellDbContext(shardingScope.ServiceProvider))
{
var isDefault = _virtualDataSource.IsDefault(dataSourceName);
if (createDatabase)
{
EnsureCreated(isDefault,shellDbContext,dataSourceName);
}
// private void InitVirtualTable(IVirtualTable virtualTable)
// {
// foreach (var tail in virtualTable.GetVirtualRoute().GetTails())
// {
// var defaultPhysicTable = new DefaultPhysicTable(virtualTable, tail);
// virtualTable.AddPhysicTable(defaultPhysicTable);
// }
// }
//
// private bool NeedCreateTable(EntityMetadata entityMetadata)
// {
// if (entityMetadata.AutoCreateTable.HasValue)
// {
// if (entityMetadata.AutoCreateTable.Value)
// return entityMetadata.AutoCreateTable.Value;
// else
// {
// if (entityMetadata.AutoCreateDataSourceTable.HasValue)
// return entityMetadata.AutoCreateDataSourceTable.Value;
// }
// }
//
// if (entityMetadata.AutoCreateDataSourceTable.HasValue)
// {
// if (entityMetadata.AutoCreateDataSourceTable.Value)
// return entityMetadata.AutoCreateDataSourceTable.Value;
// else
// {
// if (entityMetadata.AutoCreateTable.HasValue)
// return entityMetadata.AutoCreateTable.Value;
// }
// }
//
// return _routeConfigOptions.CreateShardingTableOnStart.GetValueOrDefault();
// }
//
// private void EnsureCreated(IVirtualDataSource<TShardingDbContext> virtualDataSource, DbContext context,
// string dataSourceName)
// {
// if (context is IShardingDbContext shardingDbContext)
// {
// using (var dbContext =
// shardingDbContext.GetDbContext(dataSourceName, false,
// _routeTailFactory.Create(string.Empty, false)))
// {
// var isDefault = virtualDataSource.IsDefault(dataSourceName);
//
// if (isDefault)
// {
// dbContext.RemoveDbContextRelationModelThatIsShardingTable();
// }
// else
// {
// dbContext.RemoveDbContextAllRelationModelThatIsNoSharding();
// }
//
// dbContext.Database.EnsureCreated();
// }
// }
// }
//
// private void EnsureCreateDataBaseOnly(DbContext context, string dataSourceName)
// {
// if (context is IShardingDbContext shardingDbContext)
// {
// using (var dbContext = shardingDbContext.GetDbContext(dataSourceName, false,
// _routeTailFactory.Create(string.Empty, false)))
// {
// dbContext.RemoveDbContextAllRelationModel();
// dbContext.Database.EnsureCreated();
// }
// }
// }
if (createTable)
{
var allShardingEntities = _entityMetadataManager.GetAllShardingEntities();
foreach (var entityType in allShardingEntities)
{
//如果是默认数据源
if (_virtualDataSource.IsDefault(dataSourceName))
{
if (_entityMetadataManager.IsShardingTable(entityType))
{
var virtualTableRoute = _tableRouteManager.GetRoute(entityType);
CreateDataTable(dataSourceName, virtualTableRoute, new HashSet<string>());
}
}
else
{
//非默认数据源
if (_entityMetadataManager.IsShardingDataSource(entityType))
{
var virtualDataSourceRoute = _virtualDataSource.GetRoute(entityType);
if (virtualDataSourceRoute.GetAllDataSourceNames().Contains(dataSourceName))
{
if (_entityMetadataManager.IsShardingTable(entityType))
{
var virtualTableRoute = _tableRouteManager.GetRoute(entityType);
CreateDataTable(dataSourceName, virtualTableRoute, new HashSet<string>());
}
}
}
}
}
}
}
}
}
private void EnsureCreated(bool isDefault, DbContext context,
string dataSourceName)
{
if (context is IShardingDbContext shardingDbContext)
{
using (var dbContext =
shardingDbContext.GetDbContext(dataSourceName, false,
_routeTailFactory.Create(string.Empty, false)))
{
if (isDefault)
{
dbContext.RemoveDbContextRelationModelThatIsShardingTable();
}
else
{
dbContext.RemoveDbContextAllRelationModelThatIsNoSharding();
}
dbContext.Database.EnsureCreated();
}
}
else
{
throw new ShardingCoreInvalidOperationException(
$"{nameof(IDbContextCreator)}.{nameof(IDbContextCreator.GetShellDbContext)} db context type not impl {nameof(IShardingDbContext)}");
}
}
private void CreateDataTable(string dataSourceName, IVirtualTableRoute tableRoute, ISet<string> existTables)
{
var entityMetadata = tableRoute.EntityMetadata;
foreach (var tail in tableRoute.GetTails())
{
try
{
//添加物理表
if (!existTables.Contains(entityMetadata.LogicTableName))
_tableCreator.CreateTable(dataSourceName, entityMetadata.EntityType, tail);
}
catch (Exception e)
{
if (!_routeConfigOptions.IgnoreCreateTableError.GetValueOrDefault())
{
_logger.LogWarning(e,
$"table :{entityMetadata.LogicTableName}{entityMetadata.TableSeparator}{tail} will created.");
}
}
}
}
}
}

View File

@ -1,20 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Sharding.Abstractions;

namespace ShardingCore.DynamicDataSources
{
public interface IDataSourceInitializer
{
/// <summary>
///
/// 动态初始化数据源仅创建
/// </summary>
/// <param name="dataSourceName"></param>
void InitConfigure( string dataSourceName);
/// <param name="createDatabase"></param>
/// <param name="createTable"></param>
void InitConfigure( string dataSourceName,bool createDatabase,bool createTable);
}
}

View File

@ -1,123 +1,66 @@
// using System;
// using System.Collections.Generic;
// using System.Linq;
// using System.Text;
// using System.Threading.Tasks;
// using Microsoft.EntityFrameworkCore;
// using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
// using ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions;
// using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
// using ShardingCore.DynamicDataSources;
// using ShardingCore.Exceptions;
// using ShardingCore.Sharding.Abstractions;
// using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
//
// namespace ShardingCore.Helpers
// {
// public class DynamicShardingHelper
// {
// private DynamicShardingHelper()
// {
// throw new InvalidOperationException($"{nameof(DynamicShardingHelper)} create instance");
// }
// /// <summary>
// /// 动态添加虚拟数据源配置
// /// </summary>
// /// <typeparam name="TShardingDbContext"></typeparam>
// /// <param name="configurationParams"></param>
// /// <returns></returns>
// public static bool DynamicAppendVirtualDataSourceConfig<TShardingDbContext>(
// IVirtualDataSourceConfigurationParams<TShardingDbContext> configurationParams)
// where TShardingDbContext : DbContext, IShardingDbContext
// {
// var virtualDataSourceManager = ShardingContainer.GetRequiredVirtualDataSourceManager<TShardingDbContext>();
// if (virtualDataSourceManager.AddVirtualDataSource(configurationParams))
// {
// virtualDataSourceManager.SetDefaultIfMultiConfiguration();
// var dataSourceInitializer = ShardingContainer.GetService<IDataSourceInitializer<TShardingDbContext>>();
// var virtualDataSource = virtualDataSourceManager.GetVirtualDataSource(configurationParams.ConfigId);
// foreach (var dataSource in virtualDataSource.GetDataSources())
// {
// var dataSourceName = dataSource.Key;
// var connectionString = dataSource.Value;
// dataSourceInitializer.InitConfigure(virtualDataSource, dataSourceName, connectionString, false);
// }
//
// return true;
// }
//
// return false;
// }
// /// <summary>
// /// 动态添加数据源
// /// </summary>
// /// <typeparam name="TShardingDbContext"></typeparam>
// /// <param name="virtualDataSource"></param>
// /// <param name="dataSourceName"></param>
// /// <param name="connectionString"></param>
// /// <param name="createDatabase"></param>
// /// <param name="createTable"></param>
// public static void DynamicAppendDataSource<TShardingDbContext>(IVirtualDataSource<TShardingDbContext> virtualDataSource, string dataSourceName, string connectionString,bool? createDatabase=null,bool? createTable=null) where TShardingDbContext : DbContext, IShardingDbContext
// {
// var defaultDataSourceInitializer = ShardingContainer.GetService<IDataSourceInitializer<TShardingDbContext>>();
// virtualDataSource.AddPhysicDataSource(new DefaultPhysicDataSource(dataSourceName, connectionString, false));
// defaultDataSourceInitializer.InitConfigure(virtualDataSource, dataSourceName, connectionString, false,createDatabase,createTable);
// }
// /// <summary>
// /// 动态添加数据源
// /// </summary>
// /// <typeparam name="TShardingDbContext"></typeparam>
// /// <param name="configId"></param>
// /// <param name="dataSourceName"></param>
// /// <param name="connectionString"></param>
// /// <param name="createDatabase"></param>
// /// <param name="createTable"></param>
// public static void DynamicAppendDataSource<TShardingDbContext>(string configId, string dataSourceName, string connectionString, bool? createDatabase = null, bool? createTable = null) where TShardingDbContext : DbContext, IShardingDbContext
// {
// var defaultDataSourceInitializer = ShardingContainer.GetService<IDataSourceInitializer<TShardingDbContext>>();
// var virtualDataSourceManager = ShardingContainer.GetService<IVirtualDataSourceManager<TShardingDbContext>>();
//
// var virtualDataSource = virtualDataSourceManager.GetVirtualDataSource(configId);
// virtualDataSource.AddPhysicDataSource(new DefaultPhysicDataSource(dataSourceName, connectionString, false));
// defaultDataSourceInitializer.InitConfigure(virtualDataSource, dataSourceName, connectionString, false, createDatabase, createTable);
// }
//
// /// <summary>
// /// 动态添加读写分离链接字符串
// /// </summary>
// /// <typeparam name="TShardingDbContext"></typeparam>
// /// <param name="virtualDataSource"></param>
// /// <param name="dataSourceName"></param>
// /// <param name="connectionString"></param>
// /// <param name="readNodeName"></param>
// /// <exception cref="ShardingCoreInvalidOperationException"></exception>
// public static void DynamicAppendReadWriteConnectionString<TShardingDbContext>(IVirtualDataSource<TShardingDbContext> virtualDataSource, string dataSourceName,
// string connectionString, string readNodeName=null) where TShardingDbContext : DbContext, IShardingDbContext
// {
// if (virtualDataSource.ConnectionStringManager is IReadWriteConnectionStringManager
// readWriteAppendConnectionString)
// {
// readWriteAppendConnectionString.AddReadConnectionString(dataSourceName, connectionString, readNodeName);
// return;
// }
//
// throw new ShardingCoreInvalidOperationException(
// $"{virtualDataSource.ConnectionStringManager.GetType()} cant support add read connection string");
// }
// /// <summary>
// /// 动态添加读写分离链接字符串
// /// </summary>
// /// <typeparam name="TShardingDbContext"></typeparam>
// /// <param name="configId"></param>
// /// <param name="dataSourceName"></param>
// /// <param name="connectionString"></param>
// /// <param name="readNodeName"></param>
// public static void DynamicAppendReadWriteConnectionString<TShardingDbContext>(string configId, string dataSourceName,
// string connectionString, string readNodeName = null) where TShardingDbContext : DbContext, IShardingDbContext
// {
// var virtualDataSourceManager = ShardingContainer.GetRequiredVirtualDataSourceManager<TShardingDbContext>();
// var virtualDataSource = virtualDataSourceManager.GetVirtualDataSource(configId);
// DynamicAppendReadWriteConnectionString(virtualDataSource, dataSourceName, connectionString, readNodeName);
// }
// }
// }
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.RuntimeContexts;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.DynamicDataSources;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
namespace ShardingCore.Helpers
{
public class DynamicShardingHelper
{
private DynamicShardingHelper()
{
throw new InvalidOperationException($"{nameof(DynamicShardingHelper)} create instance");
}
/// <summary>
/// 动态添加数据源
/// </summary>
/// <typeparam name="TShardingDbContext"></typeparam>
/// <param name="shardingRuntimeContext"></param>
/// <param name="dataSourceName"></param>
/// <param name="connectionString"></param>
/// <param name="createDatabase"></param>
/// <param name="createTable"></param>
public static void DynamicAppendDataSource<TShardingDbContext>(IShardingRuntimeContext shardingRuntimeContext, string dataSourceName, string connectionString,bool createDatabase,bool createTable) where TShardingDbContext : DbContext, IShardingDbContext
{
var virtualDataSource = shardingRuntimeContext.GetVirtualDataSource();
virtualDataSource.AddPhysicDataSource(new DefaultPhysicDataSource(dataSourceName, connectionString, false));
var dataSourceInitializer = shardingRuntimeContext.GetDataSourceInitializer();
dataSourceInitializer.InitConfigure(dataSourceName,createDatabase,createTable);
}
/// <summary>
/// 动态添加读写分离链接字符串
/// </summary>
/// <typeparam name="TShardingDbContext"></typeparam>
/// <param name="shardingRuntimeContext"></param>
/// <param name="dataSourceName"></param>
/// <param name="connectionString"></param>
/// <param name="readNodeName"></param>
/// <exception cref="ShardingCoreInvalidOperationException"></exception>
public static void DynamicAppendReadWriteConnectionString<TShardingDbContext>(IShardingRuntimeContext shardingRuntimeContext, string dataSourceName,
string connectionString, string readNodeName=null) where TShardingDbContext : DbContext, IShardingDbContext
{
var virtualDataSource = shardingRuntimeContext.GetVirtualDataSource();
if (virtualDataSource.ConnectionStringManager is IReadWriteConnectionStringManager
readWriteAppendConnectionString)
{
readWriteAppendConnectionString.AddReadConnectionString(dataSourceName, connectionString, readNodeName);
return;
}
throw new ShardingCoreInvalidOperationException(
$"{virtualDataSource.ConnectionStringManager.GetType()} cant support add read connection string");
}
}
}