修复启动分表bug,优化动态追加数据源表
This commit is contained in:
parent
1d3f36b388
commit
09bb6c2719
|
@ -1,9 +1,9 @@
|
|||
:start
|
||||
::定义版本
|
||||
set EFCORE2=2.3.1.77
|
||||
set EFCORE3=3.3.1.77
|
||||
set EFCORE5=5.3.1.77
|
||||
set EFCORE6=6.3.1.77
|
||||
set EFCORE2=2.3.1.78
|
||||
set EFCORE3=3.3.1.78
|
||||
set EFCORE5=5.3.1.78
|
||||
set EFCORE6=6.3.1.78
|
||||
|
||||
::删除所有bin与obj下的文件
|
||||
@echo off
|
||||
|
|
|
@ -1,152 +0,0 @@
|
|||
using System;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using ShardingCore;
|
||||
using ShardingCore.Core.EntityMetadatas;
|
||||
using ShardingCore.Core.PhysicTables;
|
||||
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
|
||||
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
|
||||
using ShardingCore.Core.VirtualDatabase.VirtualTables;
|
||||
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
|
||||
using ShardingCore.Core.VirtualTables;
|
||||
using ShardingCore.Exceptions;
|
||||
using ShardingCore.Extensions;
|
||||
using ShardingCore.Sharding.Abstractions;
|
||||
using ShardingCore.TableCreator;
|
||||
|
||||
namespace Sample.SqlServerShardingAll
|
||||
{
|
||||
public class DbContextHelper
|
||||
{
|
||||
public static void EnsureSubDbCreatedAndCreateSubTables(string dataSourceName, string connectionString, Type entityType, int sum4SubTable)
|
||||
{
|
||||
var _entityMetadataManager = ShardingContainer.GetService<IEntityMetadataManager<MyDbContext>>();
|
||||
var _virtualDataSource = ShardingContainer.GetService<IVirtualDataSource<MyDbContext>>();
|
||||
var _virtualTableManager = ShardingContainer.GetService<IVirtualTableManager<MyDbContext>>();
|
||||
var _tableCreator = ShardingContainer.GetService<IShardingTableCreator<MyDbContext>>();
|
||||
|
||||
using (var serviceScope = ShardingContainer.ServiceProvider.CreateScope())
|
||||
{
|
||||
_virtualDataSource.AddPhysicDataSource(new DefaultPhysicDataSource(dataSourceName, connectionString, false));
|
||||
var virtualDataSourceRoute = _virtualDataSource.GetRoute(entityType);
|
||||
virtualDataSourceRoute.AddDataSourceName(dataSourceName);
|
||||
|
||||
using var context = (DbContext)serviceScope.ServiceProvider.GetService(typeof(MyDbContext));
|
||||
EnsureCreated(context, dataSourceName);
|
||||
foreach (var entity in context.Model.GetEntityTypes())
|
||||
{
|
||||
if (entity.ClrType != entityType)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if (_entityMetadataManager.IsShardingTable(entityType))
|
||||
{
|
||||
var virtualTable = _virtualTableManager.GetVirtualTable(entityType);
|
||||
//创建表
|
||||
CreateDataTable(dataSourceName, virtualTable, sum4SubTable);
|
||||
}
|
||||
else
|
||||
{
|
||||
_tableCreator.CreateTable(dataSourceName, entityType, string.Empty);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
private static void CreateDataTable(string dataSourceName, IVirtualTable virtualTable, int sum4SubTable)
|
||||
{
|
||||
var _tableCreator = ShardingContainer.GetService<IShardingTableCreator<MyDbContext>>();
|
||||
var entityMetadata = virtualTable.EntityMetadata;
|
||||
int currentCount = 0;
|
||||
foreach (var tail in virtualTable.GetVirtualRoute().GetAllTails())
|
||||
{
|
||||
if (currentCount >= sum4SubTable)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (NeedCreateTable(entityMetadata))
|
||||
{
|
||||
try
|
||||
{
|
||||
//添加物理表
|
||||
virtualTable.AddPhysicTable(new DefaultPhysicTable(virtualTable, tail));
|
||||
_tableCreator.CreateTable(dataSourceName, entityMetadata.EntityType, tail);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
//if (!_shardingConfigOption.IgnoreCreateTableError.GetValueOrDefault())
|
||||
//{
|
||||
// _logger.LogWarning(ex,
|
||||
// $"table :{virtualTable.GetVirtualTableName()}{entityMetadata.TableSeparator}{tail} will created.");
|
||||
//}
|
||||
//TODO: 记录异常日志
|
||||
System.Diagnostics.Trace.TraceError($"DbContextHelper-->CreateDataTable ERROR: {ex}");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
//添加物理表
|
||||
virtualTable.AddPhysicTable(new DefaultPhysicTable(virtualTable, tail));
|
||||
}
|
||||
currentCount++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static bool NeedCreateTable(EntityMetadata entityMetadata)
|
||||
{
|
||||
if (entityMetadata.AutoCreateTable.HasValue)
|
||||
{
|
||||
if (entityMetadata.AutoCreateTable.Value)
|
||||
return entityMetadata.AutoCreateTable.Value;
|
||||
else
|
||||
{
|
||||
if (entityMetadata.AutoCreateDataSourceTable.HasValue)
|
||||
return entityMetadata.AutoCreateDataSourceTable.Value;
|
||||
}
|
||||
}
|
||||
if (entityMetadata.AutoCreateDataSourceTable.HasValue)
|
||||
{
|
||||
if (entityMetadata.AutoCreateDataSourceTable.Value)
|
||||
return entityMetadata.AutoCreateDataSourceTable.Value;
|
||||
else
|
||||
{
|
||||
if (entityMetadata.AutoCreateTable.HasValue)
|
||||
return entityMetadata.AutoCreateTable.Value;
|
||||
}
|
||||
}
|
||||
|
||||
//return _shardingConfigOption.CreateShardingTableOnStart.GetValueOrDefault();
|
||||
return true;
|
||||
}
|
||||
|
||||
private static void EnsureCreated(DbContext context, string dataSourceName)
|
||||
{
|
||||
var _routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
|
||||
|
||||
if (context is IShardingDbContext shardingDbContext)
|
||||
{
|
||||
var dbContext = shardingDbContext.GetDbContext(dataSourceName, false, _routeTailFactory.Create(string.Empty));
|
||||
|
||||
var modelCacheSyncObject = dbContext.GetModelCacheSyncObject();
|
||||
|
||||
var acquire = System.Threading.Monitor.TryEnter(modelCacheSyncObject, TimeSpan.FromSeconds(3));
|
||||
if (!acquire)
|
||||
{
|
||||
throw new ShardingCoreException("cant get modelCacheSyncObject lock");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
dbContext.RemoveDbContextRelationModelThatIsShardingTable();
|
||||
dbContext.Database.EnsureCreated();
|
||||
dbContext.RemoveModelCache();
|
||||
}
|
||||
finally
|
||||
{
|
||||
System.Threading.Monitor.Exit(modelCacheSyncObject);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@ using ShardingCore.Core.VirtualRoutes.DataSourceRoutes;
|
|||
using ShardingCore.Core.VirtualRoutes.TableRoutes;
|
||||
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
|
||||
using ShardingCore.Core.VirtualTables;
|
||||
using ShardingCore.DynamicDataSources;
|
||||
using ShardingCore.Exceptions;
|
||||
using ShardingCore.Extensions;
|
||||
using ShardingCore.Jobs;
|
||||
|
@ -60,6 +61,7 @@ namespace ShardingCore.Bootstrapers
|
|||
private readonly IEntityMetadataManager<TShardingDbContext> _entityMetadataManager;
|
||||
private readonly IShardingTableCreator<TShardingDbContext> _tableCreator;
|
||||
private readonly IParallelTableManager<TShardingDbContext> _parallelTableManager;
|
||||
private readonly IDefaultDataSourceInitializer<TShardingDbContext> _dataSourceInitializer;
|
||||
private readonly ILogger<ShardingDbContextBootstrapper<TShardingDbContext>> _logger;
|
||||
|
||||
public ShardingDbContextBootstrapper(IShardingConfigOption shardingConfigOption)
|
||||
|
@ -71,6 +73,7 @@ namespace ShardingCore.Bootstrapers
|
|||
_tableCreator = ShardingContainer.GetService<IShardingTableCreator<TShardingDbContext>>();
|
||||
_virtualDataSource= ShardingContainer.GetService<IVirtualDataSource<TShardingDbContext>>();
|
||||
_parallelTableManager = ShardingContainer.GetService<IParallelTableManager<TShardingDbContext>>();
|
||||
_dataSourceInitializer = ShardingContainer.GetService<IDefaultDataSourceInitializer<TShardingDbContext>>();
|
||||
_logger = ShardingContainer.GetService<ILogger<ShardingDbContextBootstrapper<TShardingDbContext>>>();
|
||||
}
|
||||
/// <summary>
|
||||
|
@ -143,130 +146,10 @@ namespace ShardingCore.Bootstrapers
|
|||
var dataSources = _shardingConfigOption.GetDataSources();
|
||||
foreach (var dataSourceKv in dataSources)
|
||||
{
|
||||
using (var serviceScope = ShardingContainer.ServiceProvider.CreateScope())
|
||||
{
|
||||
var dataSourceName = dataSourceKv.Key;
|
||||
var connectionString = dataSourceKv.Value;
|
||||
_virtualDataSource.AddPhysicDataSource(new DefaultPhysicDataSource(dataSourceName, connectionString, false));
|
||||
using var context =
|
||||
(DbContext)serviceScope.ServiceProvider.GetService(_shardingConfigOption.ShardingDbContextType);
|
||||
if (_shardingConfigOption.EnsureCreatedWithOutShardingTable)
|
||||
EnsureCreated(context, dataSourceName);
|
||||
foreach (var entity in context.Model.GetEntityTypes())
|
||||
{
|
||||
var entityType = entity.ClrType;
|
||||
|
||||
if (_entityMetadataManager.IsShardingTable(entityType))
|
||||
{
|
||||
var virtualTable = _virtualTableManager.GetVirtualTable(entityType);
|
||||
//创建表
|
||||
CreateDataTable(dataSourceName, virtualTable);
|
||||
}
|
||||
else
|
||||
{
|
||||
if(_shardingConfigOption.NeedCreateTable(entityType))
|
||||
{
|
||||
_tableCreator.CreateTable(dataSourceName, entityType, string.Empty);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
var dataSourceName = dataSourceKv.Key;
|
||||
var connectionString = dataSourceKv.Value;
|
||||
_dataSourceInitializer.InitConfigure(dataSourceName, connectionString);
|
||||
}
|
||||
}
|
||||
private void CreateDataTable(string dataSourceName, IVirtualTable virtualTable)
|
||||
{
|
||||
var entityMetadata = virtualTable.EntityMetadata;
|
||||
foreach (var tail in virtualTable.GetVirtualRoute().GetAllTails())
|
||||
{
|
||||
if (NeedCreateTable(entityMetadata))
|
||||
{
|
||||
try
|
||||
{
|
||||
|
||||
//添加物理表
|
||||
virtualTable.AddPhysicTable(new DefaultPhysicTable(virtualTable, tail));
|
||||
_tableCreator.CreateTable(dataSourceName, entityMetadata.EntityType, tail);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
if (!_shardingConfigOption.IgnoreCreateTableError.GetValueOrDefault())
|
||||
{
|
||||
_logger.LogWarning(e,
|
||||
$"table :{virtualTable.GetVirtualTableName()}{entityMetadata.TableSeparator}{tail} will created.");
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
//添加物理表
|
||||
virtualTable.AddPhysicTable(new DefaultPhysicTable(virtualTable, tail));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
private bool NeedCreateTable(EntityMetadata entityMetadata)
|
||||
{
|
||||
if (entityMetadata.AutoCreateTable.HasValue)
|
||||
{
|
||||
if (entityMetadata.AutoCreateTable.Value)
|
||||
return entityMetadata.AutoCreateTable.Value;
|
||||
else
|
||||
{
|
||||
if (entityMetadata.AutoCreateDataSourceTable.HasValue)
|
||||
return entityMetadata.AutoCreateDataSourceTable.Value;
|
||||
}
|
||||
}
|
||||
if (entityMetadata.AutoCreateDataSourceTable.HasValue)
|
||||
{
|
||||
if (entityMetadata.AutoCreateDataSourceTable.Value)
|
||||
return entityMetadata.AutoCreateDataSourceTable.Value;
|
||||
else
|
||||
{
|
||||
if (entityMetadata.AutoCreateTable.HasValue)
|
||||
return entityMetadata.AutoCreateTable.Value;
|
||||
}
|
||||
}
|
||||
|
||||
return _shardingConfigOption.CreateShardingTableOnStart.GetValueOrDefault();
|
||||
}
|
||||
private void EnsureCreated(DbContext context, string dataSourceName)
|
||||
{
|
||||
if (context is IShardingDbContext shardingDbContext)
|
||||
{
|
||||
var dbContext = shardingDbContext.GetDbContext(dataSourceName, false, _routeTailFactory.Create(string.Empty));
|
||||
|
||||
var isDefault = _virtualDataSource.IsDefault(dataSourceName);
|
||||
|
||||
var modelCacheSyncObject = dbContext.GetModelCacheSyncObject();
|
||||
|
||||
var acquire = Monitor.TryEnter(modelCacheSyncObject, TimeSpan.FromSeconds(3));
|
||||
if (!acquire)
|
||||
{
|
||||
throw new ShardingCoreException("cant get modelCacheSyncObject lock");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if(isDefault)
|
||||
{
|
||||
dbContext.RemoveDbContextRelationModelThatIsShardingTable();
|
||||
}
|
||||
else
|
||||
{
|
||||
dbContext.RemoveDbContextAllRelationModelThatIsNoSharding();
|
||||
}
|
||||
dbContext.Database.EnsureCreated();
|
||||
dbContext.RemoveModelCache();
|
||||
}
|
||||
finally
|
||||
{
|
||||
Monitor.Exit(modelCacheSyncObject);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,10 @@ using ShardingCore.Sharding.Abstractions;
|
|||
using ShardingCore.Sharding.ShardingQueryExecutors;
|
||||
using ShardingCore.TableCreator;
|
||||
using System;
|
||||
using ShardingCore.DynamicDataSources;
|
||||
using ShardingCore.Sharding.ParallelTables;
|
||||
using ShardingCore.Sharding.ReadWriteConfigurations;
|
||||
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
|
||||
|
||||
namespace ShardingCore
|
||||
{
|
||||
|
@ -86,6 +89,7 @@ namespace ShardingCore
|
|||
services.TryAddSingleton(typeof(IShardingDbContextCreatorConfig<>),typeof(DefaultShardingDbContextCreatorConfig<>));
|
||||
|
||||
|
||||
services.TryAddSingleton(typeof(IDefaultDataSourceInitializer<>),typeof(DefaultDataSourceInitializer<>));
|
||||
services.TryAddSingleton(typeof(ITrackerManager<>),typeof(TrackerManager<>));
|
||||
services.TryAddSingleton(typeof(IStreamMergeContextFactory<>),typeof(StreamMergeContextFactory<>));
|
||||
services.TryAddSingleton(typeof(IShardingTableCreator<>),typeof(ShardingTableCreator<>));
|
||||
|
@ -109,6 +113,7 @@ namespace ShardingCore
|
|||
services.TryAddSingleton(typeof(IParallelTableManager<>),typeof(ParallelTableManager<>));
|
||||
services.TryAddSingleton<IRouteTailFactory, RouteTailFactory>();
|
||||
services.TryAddSingleton<IShardingQueryExecutor, DefaultShardingQueryExecutor>();
|
||||
services.TryAddSingleton<IReadWriteConnectorFactory, ReadWriteConnectorFactory>();
|
||||
|
||||
//route manage
|
||||
services.TryAddSingleton<IShardingRouteManager, ShardingRouteManager>();
|
||||
|
|
|
@ -98,29 +98,14 @@ namespace ShardingCore.DIExtensions
|
|||
_shardingCoreConfigBuilder.ShardingConfigOption.ReadWriteDefaultEnable,
|
||||
_shardingCoreConfigBuilder.ShardingConfigOption.ReadStrategyEnum,
|
||||
_shardingCoreConfigBuilder.ShardingConfigOption.ReadConnStringGetStrategy));
|
||||
bool isLoop = false;
|
||||
var readStrategyEnum = _shardingCoreConfigBuilder.ShardingConfigOption.ReadStrategyEnum;
|
||||
|
||||
if ( readStrategyEnum== ReadStrategyEnum.Loop)
|
||||
{
|
||||
isLoop = true;
|
||||
}
|
||||
else if (_shardingCoreConfigBuilder.ShardingConfigOption.ReadStrategyEnum == ReadStrategyEnum.Random)
|
||||
{
|
||||
isLoop = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new ShardingCoreInvalidOperationException($"unknow ReadStrategyEnum confgure:{_shardingCoreConfigBuilder.ShardingConfigOption.ReadStrategyEnum}");
|
||||
}
|
||||
|
||||
services
|
||||
.AddSingleton<IShardingConnectionStringResolver<TShardingDbContext>,
|
||||
ReadWriteShardingConnectionStringResolver<TShardingDbContext>>(sp =>
|
||||
{
|
||||
|
||||
var readConnString = _shardingCoreConfigBuilder.ShardingConfigOption.ReadConnStringConfigure(sp);
|
||||
var readWriteLoopConnectors = readConnString.Select(o => (IReadWriteConnector)(isLoop ? new ReadWriteLoopConnector(o.Key, o.Value) : new ReadWriteRandomConnector(o.Key, o.Value)));
|
||||
var readWriteConnectorFactory = sp.GetRequiredService<IReadWriteConnectorFactory>();
|
||||
var readConnStrings = _shardingCoreConfigBuilder.ShardingConfigOption.ReadConnStringConfigure(sp);
|
||||
var readWriteLoopConnectors = readConnStrings.Select(o => readWriteConnectorFactory.CreateConnector<TShardingDbContext>(_shardingCoreConfigBuilder.ShardingConfigOption.ReadStrategyEnum,o.Key,o.Value));
|
||||
|
||||
return new ReadWriteShardingConnectionStringResolver<TShardingDbContext>(
|
||||
readWriteLoopConnectors);
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ShardingCore.Bootstrapers;
|
||||
using ShardingCore.Core.EntityMetadatas;
|
||||
using ShardingCore.Core.PhysicTables;
|
||||
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
|
||||
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
|
||||
using ShardingCore.Core.VirtualDatabase.VirtualTables;
|
||||
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
|
||||
using ShardingCore.Core.VirtualTables;
|
||||
using ShardingCore.Exceptions;
|
||||
using ShardingCore.Extensions;
|
||||
using ShardingCore.Sharding.Abstractions;
|
||||
using ShardingCore.Sharding.ParallelTables;
|
||||
using ShardingCore.TableCreator;
|
||||
|
||||
namespace ShardingCore.DynamicDataSources
|
||||
{
|
||||
public interface IDefaultDataSourceInitializer<TShardingDbContext> where TShardingDbContext : DbContext, IShardingDbContext
|
||||
{
|
||||
void InitConfigure(string dataSourceName, string connectionString);
|
||||
}
|
||||
|
||||
public class DefaultDataSourceInitializer<TShardingDbContext>: IDefaultDataSourceInitializer<TShardingDbContext> where TShardingDbContext:DbContext,IShardingDbContext
|
||||
{
|
||||
private readonly IRouteTailFactory _routeTailFactory;
|
||||
private readonly IVirtualTableManager<TShardingDbContext> _virtualTableManager;
|
||||
private readonly IVirtualDataSource<TShardingDbContext> _virtualDataSource;
|
||||
private readonly IEntityMetadataManager<TShardingDbContext> _entityMetadataManager;
|
||||
private readonly IShardingTableCreator<TShardingDbContext> _tableCreator;
|
||||
private readonly ILogger<DefaultDataSourceInitializer<TShardingDbContext>> _logger;
|
||||
private readonly IShardingConfigOption _shardingConfigOption;
|
||||
public DefaultDataSourceInitializer(IEnumerable<IShardingConfigOption> shardingConfigOptions,
|
||||
IRouteTailFactory routeTailFactory, IVirtualTableManager<TShardingDbContext> virtualTableManager,
|
||||
IEntityMetadataManager<TShardingDbContext> entityMetadataManager,
|
||||
IShardingTableCreator<TShardingDbContext> shardingTableCreator,
|
||||
IVirtualDataSource<TShardingDbContext> virtualDataSource,
|
||||
ILogger<DefaultDataSourceInitializer<TShardingDbContext>> logger)
|
||||
{
|
||||
_shardingConfigOption =
|
||||
shardingConfigOptions.FirstOrDefault(o => o.ShardingDbContextType == typeof(TShardingDbContext))??throw new ArgumentNullException($"{nameof(IShardingConfigOption)} cant been registered {typeof(TShardingDbContext)}");
|
||||
_routeTailFactory = routeTailFactory;
|
||||
_virtualTableManager = virtualTableManager;
|
||||
_entityMetadataManager = entityMetadataManager;
|
||||
_tableCreator = shardingTableCreator;
|
||||
_virtualDataSource = virtualDataSource;
|
||||
_logger = logger;
|
||||
}
|
||||
public void InitConfigure(string dataSourceName,string connectionString)
|
||||
{
|
||||
using (var serviceScope = ShardingContainer.ServiceProvider.CreateScope())
|
||||
{
|
||||
_virtualDataSource.AddPhysicDataSource(new DefaultPhysicDataSource(dataSourceName, connectionString, false));
|
||||
using var context =
|
||||
(DbContext)serviceScope.ServiceProvider.GetService(_shardingConfigOption.ShardingDbContextType);
|
||||
if (_shardingConfigOption.EnsureCreatedWithOutShardingTable)
|
||||
EnsureCreated(context, dataSourceName);
|
||||
foreach (var entity in context.Model.GetEntityTypes())
|
||||
{
|
||||
var entityType = entity.ClrType;
|
||||
|
||||
if (_virtualDataSource.IsDefault(dataSourceName))
|
||||
{
|
||||
if (_entityMetadataManager.IsShardingTable(entityType))
|
||||
{
|
||||
var virtualTable = _virtualTableManager.GetVirtualTable(entityType);
|
||||
//创建表
|
||||
CreateDataTable(dataSourceName, virtualTable);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (_entityMetadataManager.IsShardingDataSource(entityType))
|
||||
{
|
||||
var virtualDataSourceRoute = _virtualDataSource.GetRoute(entityType);
|
||||
if (virtualDataSourceRoute.GetAllDataSourceNames().Contains(dataSourceName))
|
||||
{
|
||||
if (_entityMetadataManager.IsShardingTable(entityType))
|
||||
{
|
||||
var virtualTable = _virtualTableManager.GetVirtualTable(entityType);
|
||||
//创建表
|
||||
CreateDataTable(dataSourceName, virtualTable);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (_shardingConfigOption.NeedCreateTable(entityType))
|
||||
{
|
||||
_tableCreator.CreateTable(dataSourceName, entityType, string.Empty);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
private void CreateDataTable(string dataSourceName, IVirtualTable virtualTable)
|
||||
{
|
||||
var entityMetadata = virtualTable.EntityMetadata;
|
||||
foreach (var tail in virtualTable.GetVirtualRoute().GetAllTails())
|
||||
{
|
||||
if (NeedCreateTable(entityMetadata))
|
||||
{
|
||||
try
|
||||
{
|
||||
//添加物理表
|
||||
virtualTable.AddPhysicTable(new DefaultPhysicTable(virtualTable, tail));
|
||||
_tableCreator.CreateTable(dataSourceName, entityMetadata.EntityType, tail);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
if (!_shardingConfigOption.IgnoreCreateTableError.GetValueOrDefault())
|
||||
{
|
||||
_logger.LogWarning(e,
|
||||
$"table :{virtualTable.GetVirtualTableName()}{entityMetadata.TableSeparator}{tail} will created.");
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
//添加物理表
|
||||
virtualTable.AddPhysicTable(new DefaultPhysicTable(virtualTable, tail));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
private bool NeedCreateTable(EntityMetadata entityMetadata)
|
||||
{
|
||||
if (entityMetadata.AutoCreateTable.HasValue)
|
||||
{
|
||||
if (entityMetadata.AutoCreateTable.Value)
|
||||
return entityMetadata.AutoCreateTable.Value;
|
||||
else
|
||||
{
|
||||
if (entityMetadata.AutoCreateDataSourceTable.HasValue)
|
||||
return entityMetadata.AutoCreateDataSourceTable.Value;
|
||||
}
|
||||
}
|
||||
if (entityMetadata.AutoCreateDataSourceTable.HasValue)
|
||||
{
|
||||
if (entityMetadata.AutoCreateDataSourceTable.Value)
|
||||
return entityMetadata.AutoCreateDataSourceTable.Value;
|
||||
else
|
||||
{
|
||||
if (entityMetadata.AutoCreateTable.HasValue)
|
||||
return entityMetadata.AutoCreateTable.Value;
|
||||
}
|
||||
}
|
||||
|
||||
return _shardingConfigOption.CreateShardingTableOnStart.GetValueOrDefault();
|
||||
}
|
||||
private void EnsureCreated(DbContext context, string dataSourceName)
|
||||
{
|
||||
if (context is IShardingDbContext shardingDbContext)
|
||||
{
|
||||
var dbContext = shardingDbContext.GetDbContext(dataSourceName, false, _routeTailFactory.Create(string.Empty));
|
||||
|
||||
var isDefault = _virtualDataSource.IsDefault(dataSourceName);
|
||||
|
||||
var modelCacheSyncObject = dbContext.GetModelCacheSyncObject();
|
||||
|
||||
var acquire = Monitor.TryEnter(modelCacheSyncObject, TimeSpan.FromSeconds(3));
|
||||
if (!acquire)
|
||||
{
|
||||
throw new ShardingCoreException("cant get modelCacheSyncObject lock");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (isDefault)
|
||||
{
|
||||
dbContext.RemoveDbContextRelationModelThatIsShardingTable();
|
||||
}
|
||||
else
|
||||
{
|
||||
dbContext.RemoveDbContextAllRelationModel();
|
||||
}
|
||||
dbContext.Database.EnsureCreated();
|
||||
dbContext.RemoveModelCache();
|
||||
}
|
||||
finally
|
||||
{
|
||||
Monitor.Exit(modelCacheSyncObject);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
using Microsoft.EntityFrameworkCore;
|
||||
using ShardingCore.Sharding.Abstractions;
|
||||
using System;
|
||||
|
||||
namespace ShardingCore.DynamicDataSources
|
||||
{
|
||||
public class DynamicDataSourceHelper
|
||||
{
|
||||
private DynamicDataSourceHelper()
|
||||
{
|
||||
throw new InvalidOperationException($"{nameof(DynamicDataSourceHelper)} create instance");
|
||||
}
|
||||
|
||||
public static void DynamicAppendDataSource<TShardingDbContext>(string dataSourceName, string connectionString) where TShardingDbContext:DbContext,IShardingDbContext
|
||||
{
|
||||
var defaultDataSourceInitializer = ShardingContainer.GetService<IDefaultDataSourceInitializer<TShardingDbContext>>();
|
||||
defaultDataSourceInitializer.InitConfigure(dataSourceName, connectionString);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -71,7 +71,7 @@ namespace ShardingCore.Extensions
|
|||
/// 移除所有的没有分片的表
|
||||
/// </summary>
|
||||
/// <param name="dbContext"></param>
|
||||
public static void RemoveDbContextAllRelationModelThatIsNoSharding(this DbContext dbContext)
|
||||
public static void RemoveDbContextAllRelationModel(this DbContext dbContext)
|
||||
{
|
||||
#if EFCORE6
|
||||
|
||||
|
@ -81,34 +81,18 @@ namespace ShardingCore.Extensions
|
|||
|
||||
var contextModel = dbContext.Model as Model;
|
||||
#endif
|
||||
var entityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(dbContext.GetType()));
|
||||
|
||||
#if EFCORE6
|
||||
var contextModelRelationalModel = contextModel.GetRelationalModel() as RelationalModel;
|
||||
var valueTuples =
|
||||
contextModelRelationalModel.Tables.Where(o => o.Value.EntityTypeMappings.Any(m => entityMetadataManager.IsShardingTable(m.EntityType.ClrType) ||entityMetadataManager.TryGet(m.EntityType.ClrType)==null)).Select(o => o.Key).ToList();
|
||||
for (int i = 0; i < valueTuples.Count; i++)
|
||||
{
|
||||
contextModelRelationalModel.Tables.Remove(valueTuples[i]);
|
||||
}
|
||||
contextModelRelationalModel.Tables.Clear();
|
||||
#endif
|
||||
#if EFCORE5
|
||||
var contextModelRelationalModel = contextModel.RelationalModel as RelationalModel;
|
||||
var valueTuples =
|
||||
contextModelRelationalModel.Tables.Where(o => o.Value.EntityTypeMappings.Any(m => entityMetadataManager.IsShardingTable(m.EntityType.ClrType)||entityMetadataManager.TryGet(m.EntityType.ClrType)==null)).Select(o => o.Key).ToList();
|
||||
for (int i = 0; i < valueTuples.Count; i++)
|
||||
{
|
||||
contextModelRelationalModel.Tables.Remove(valueTuples[i]);
|
||||
}
|
||||
contextModelRelationalModel.Tables.Clear();
|
||||
#endif
|
||||
#if EFCORE2 || EFCORE3
|
||||
var entityTypes =
|
||||
contextModel.GetFieldValue("_entityTypes") as SortedDictionary<string, EntityType>;
|
||||
var list = entityTypes.Where(o=>entityMetadataManager.IsShardingTable(o.Value.ClrType)||entityMetadataManager.TryGet(o.Value.ClrType)==null).Select(o=>o.Key).ToList();
|
||||
for (int i = 0; i < list.Count; i++)
|
||||
{
|
||||
entityTypes.Remove(list[i]);
|
||||
}
|
||||
entityTypes.Clear();
|
||||
#endif
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ShardingCore.Sharding.Abstractions;
|
||||
|
||||
namespace ShardingCore.Sharding.ReadWriteConfigurations.Abstractions
|
||||
{
|
||||
public interface IReadWriteConnectorFactory
|
||||
{
|
||||
IReadWriteConnector CreateConnector<TShardingDbContext>(ReadStrategyEnum strategy, string dataSourceName, IEnumerable<string> connectionStrings) where TShardingDbContext:DbContext,IShardingDbContext;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ShardingCore.Exceptions;
|
||||
using ShardingCore.Sharding.Abstractions;
|
||||
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
|
||||
|
||||
namespace ShardingCore.Sharding.ReadWriteConfigurations
|
||||
{
|
||||
public class ReadWriteConnectorFactory: IReadWriteConnectorFactory
|
||||
{
|
||||
public IReadWriteConnector CreateConnector<TShardingDbContext>(ReadStrategyEnum strategy,string dataSourceName, IEnumerable<string> connectionStrings) where TShardingDbContext : DbContext, IShardingDbContext
|
||||
{
|
||||
var readWriteOptions = ShardingContainer.GetService<IReadWriteOptions<TShardingDbContext>>();
|
||||
if (readWriteOptions == null)
|
||||
throw new ShardingCoreInvalidOperationException(
|
||||
"cant create read write connector should use read write");
|
||||
|
||||
if (readWriteOptions.ReadStrategy == ReadStrategyEnum.Loop)
|
||||
{
|
||||
return new ReadWriteLoopConnector(dataSourceName, connectionStrings);
|
||||
}
|
||||
else if (readWriteOptions.ReadStrategy == ReadStrategyEnum.Random)
|
||||
{
|
||||
return new ReadWriteRandomConnector(dataSourceName, connectionStrings);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new ShardingCoreInvalidOperationException(
|
||||
$"unknown read write strategy:[{readWriteOptions.ReadStrategy}]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,6 +17,7 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
|
|||
new ConcurrentDictionary<string, IReadWriteConnector>();
|
||||
|
||||
private readonly IReadWriteOptions<TShardingDbContext> _readWriteOptions;
|
||||
private readonly IReadWriteConnectorFactory _readWriteConnectorFactory;
|
||||
public ReadWriteShardingConnectionStringResolver(IEnumerable<IReadWriteConnector> connectors)
|
||||
{
|
||||
var enumerator = connectors.GetEnumerator();
|
||||
|
@ -28,6 +29,7 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
|
|||
}
|
||||
|
||||
_readWriteOptions = ShardingContainer.GetService<IReadWriteOptions<TShardingDbContext>>();
|
||||
_readWriteConnectorFactory = ShardingContainer.GetService<IReadWriteConnectorFactory>();
|
||||
}
|
||||
|
||||
public bool ContainsReadWriteDataSourceName(string dataSourceName)
|
||||
|
@ -46,23 +48,13 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
|
|||
{
|
||||
if (!_connectors.TryGetValue(dataSourceName, out var connector))
|
||||
{
|
||||
if (_readWriteOptions.ReadStrategy == ReadStrategyEnum.Loop)
|
||||
{
|
||||
connector= new ReadWriteLoopConnector(dataSourceName, new List<string> { connectionString });
|
||||
_connectors.TryAdd(dataSourceName, connector);
|
||||
return true;
|
||||
}
|
||||
else if (_readWriteOptions.ReadStrategy == ReadStrategyEnum.Random)
|
||||
{
|
||||
connector= new ReadWriteRandomConnector(dataSourceName, new List<string> { connectionString });
|
||||
_connectors.TryAdd(dataSourceName, connector);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new ShardingCoreInvalidOperationException(
|
||||
$"unknown read write strategy:[{_readWriteOptions.ReadStrategy}]");
|
||||
}
|
||||
connector = _readWriteConnectorFactory.CreateConnector<TShardingDbContext>(_readWriteOptions.ReadStrategy,
|
||||
dataSourceName, new List<string>()
|
||||
{
|
||||
connectionString
|
||||
});
|
||||
_connectors.TryAdd(dataSourceName, connector);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -224,9 +224,10 @@ namespace ShardingCore.Test
|
|||
{ new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), });
|
||||
Assert.Equal(x1x1, x2x2);
|
||||
Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode());
|
||||
var succeedAddConnectionString = _shardingConnectionStringResolver.AddConnectionString("A", "Data Source=localhost;Initial Catalog=ShardingCoreDBC;Integrated Security=True;");
|
||||
var succeedAddConnectionString = _shardingConnectionStringResolver.AddConnectionString("X", "Data Source=localhost;Initial Catalog=ShardingCoreDBC;Integrated Security=True;");
|
||||
Assert.True(succeedAddConnectionString);
|
||||
|
||||
var connectionString = _shardingConnectionStringResolver.GetConnectionString("X");
|
||||
Assert.Equal("Data Source=localhost;Initial Catalog=ShardingCoreDBC;Integrated Security=True;",connectionString);
|
||||
}
|
||||
|
||||
public class SequenceClass
|
||||
|
|
Loading…
Reference in New Issue