修复动态添加读写分离从库bug

This commit is contained in:
xuejiaming 2021-12-10 13:42:54 +08:00
parent af905a7dd9
commit fe7275882d
7 changed files with 188 additions and 8 deletions

View File

@ -1,9 +1,9 @@
:start :start
::定义版本 ::定义版本
set EFCORE2=2.3.1.75 set EFCORE2=2.3.1.76
set EFCORE3=3.3.1.75 set EFCORE3=3.3.1.76
set EFCORE5=5.3.1.75 set EFCORE5=5.3.1.76
set EFCORE6=6.3.1.75 set EFCORE6=6.3.1.76
::删除所有bin与obj下的文件 ::删除所有bin与obj下的文件
@echo off @echo off

View File

@ -0,0 +1,152 @@
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.PhysicTables;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.TableCreator;
namespace Sample.SqlServerShardingAll
{
public class DbContextHelper
{
public static void EnsureSubDbCreatedAndCreateSubTables(string dataSourceName, string connectionString, Type entityType, int sum4SubTable)
{
var _entityMetadataManager = ShardingContainer.GetService<IEntityMetadataManager<MyDbContext>>();
var _virtualDataSource = ShardingContainer.GetService<IVirtualDataSource<MyDbContext>>();
var _virtualTableManager = ShardingContainer.GetService<IVirtualTableManager<MyDbContext>>();
var _tableCreator = ShardingContainer.GetService<IShardingTableCreator<MyDbContext>>();
using (var serviceScope = ShardingContainer.ServiceProvider.CreateScope())
{
_virtualDataSource.AddPhysicDataSource(new DefaultPhysicDataSource(dataSourceName, connectionString, false));
var virtualDataSourceRoute = _virtualDataSource.GetRoute(entityType);
virtualDataSourceRoute.AddDataSourceName(dataSourceName);
using var context = (DbContext)serviceScope.ServiceProvider.GetService(typeof(MyDbContext));
EnsureCreated(context, dataSourceName);
foreach (var entity in context.Model.GetEntityTypes())
{
if (entity.ClrType != entityType)
{
continue;
}
if (_entityMetadataManager.IsShardingTable(entityType))
{
var virtualTable = _virtualTableManager.GetVirtualTable(entityType);
//创建表
CreateDataTable(dataSourceName, virtualTable, sum4SubTable);
}
else
{
_tableCreator.CreateTable(dataSourceName, entityType, string.Empty);
}
}
}
}
private static void CreateDataTable(string dataSourceName, IVirtualTable virtualTable, int sum4SubTable)
{
var _tableCreator = ShardingContainer.GetService<IShardingTableCreator<MyDbContext>>();
var entityMetadata = virtualTable.EntityMetadata;
int currentCount = 0;
foreach (var tail in virtualTable.GetVirtualRoute().GetAllTails())
{
if (currentCount >= sum4SubTable)
{
break;
}
if (NeedCreateTable(entityMetadata))
{
try
{
//添加物理表
virtualTable.AddPhysicTable(new DefaultPhysicTable(virtualTable, tail));
_tableCreator.CreateTable(dataSourceName, entityMetadata.EntityType, tail);
}
catch (Exception ex)
{
//if (!_shardingConfigOption.IgnoreCreateTableError.GetValueOrDefault())
//{
// _logger.LogWarning(ex,
// $"table :{virtualTable.GetVirtualTableName()}{entityMetadata.TableSeparator}{tail} will created.");
//}
//TODO: 记录异常日志
System.Diagnostics.Trace.TraceError($"DbContextHelper-->CreateDataTable ERROR: {ex}");
}
}
else
{
//添加物理表
virtualTable.AddPhysicTable(new DefaultPhysicTable(virtualTable, tail));
}
currentCount++;
}
}
private static bool NeedCreateTable(EntityMetadata entityMetadata)
{
if (entityMetadata.AutoCreateTable.HasValue)
{
if (entityMetadata.AutoCreateTable.Value)
return entityMetadata.AutoCreateTable.Value;
else
{
if (entityMetadata.AutoCreateDataSourceTable.HasValue)
return entityMetadata.AutoCreateDataSourceTable.Value;
}
}
if (entityMetadata.AutoCreateDataSourceTable.HasValue)
{
if (entityMetadata.AutoCreateDataSourceTable.Value)
return entityMetadata.AutoCreateDataSourceTable.Value;
else
{
if (entityMetadata.AutoCreateTable.HasValue)
return entityMetadata.AutoCreateTable.Value;
}
}
//return _shardingConfigOption.CreateShardingTableOnStart.GetValueOrDefault();
return true;
}
private static void EnsureCreated(DbContext context, string dataSourceName)
{
var _routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
if (context is IShardingDbContext shardingDbContext)
{
var dbContext = shardingDbContext.GetDbContext(dataSourceName, false, _routeTailFactory.Create(string.Empty));
var modelCacheSyncObject = dbContext.GetModelCacheSyncObject();
var acquire = System.Threading.Monitor.TryEnter(modelCacheSyncObject, TimeSpan.FromSeconds(3));
if (!acquire)
{
throw new ShardingCoreException("cant get modelCacheSyncObject lock");
}
try
{
dbContext.RemoveDbContextRelationModelThatIsShardingTable();
dbContext.Database.EnsureCreated();
dbContext.RemoveModelCache();
}
finally
{
System.Threading.Monitor.Exit(modelCacheSyncObject);
}
}
}
}
}

View File

@ -96,6 +96,7 @@ namespace ShardingCore.DIExtensions
new ReadWriteOptions<TShardingDbContext>( new ReadWriteOptions<TShardingDbContext>(
_shardingCoreConfigBuilder.ShardingConfigOption.ReadWriteDefaultPriority, _shardingCoreConfigBuilder.ShardingConfigOption.ReadWriteDefaultPriority,
_shardingCoreConfigBuilder.ShardingConfigOption.ReadWriteDefaultEnable, _shardingCoreConfigBuilder.ShardingConfigOption.ReadWriteDefaultEnable,
_shardingCoreConfigBuilder.ShardingConfigOption.ReadStrategyEnum,
_shardingCoreConfigBuilder.ShardingConfigOption.ReadConnStringGetStrategy)); _shardingCoreConfigBuilder.ShardingConfigOption.ReadConnStringGetStrategy));
bool isLoop = false; bool isLoop = false;
var readStrategyEnum = _shardingCoreConfigBuilder.ShardingConfigOption.ReadStrategyEnum; var readStrategyEnum = _shardingCoreConfigBuilder.ShardingConfigOption.ReadStrategyEnum;

View File

@ -24,6 +24,7 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations.Abstractions
/// 默认是否开启读写分离 /// 默认是否开启读写分离
/// </summary> /// </summary>
bool ReadWriteSupport { get; } bool ReadWriteSupport { get; }
ReadStrategyEnum ReadStrategy { get; }
ReadConnStringGetStrategyEnum ReadConnStringGetStrategy { get; } ReadConnStringGetStrategyEnum ReadConnStringGetStrategy { get; }
} }
} }

View File

@ -18,10 +18,11 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
public class ReadWriteOptions<TShardingDbContext> : IReadWriteOptions<TShardingDbContext> public class ReadWriteOptions<TShardingDbContext> : IReadWriteOptions<TShardingDbContext>
where TShardingDbContext : DbContext, IShardingDbContext where TShardingDbContext : DbContext, IShardingDbContext
{ {
public ReadWriteOptions(int readWritePriority, bool readWriteSupport, ReadConnStringGetStrategyEnum readConnStringGetStrategy) public ReadWriteOptions(int readWritePriority, bool readWriteSupport, ReadStrategyEnum readStrategy, ReadConnStringGetStrategyEnum readConnStringGetStrategy)
{ {
ReadWritePriority = readWritePriority; ReadWritePriority = readWritePriority;
ReadWriteSupport = readWriteSupport; ReadWriteSupport = readWriteSupport;
ReadStrategy = readStrategy;
ReadConnStringGetStrategy = readConnStringGetStrategy; ReadConnStringGetStrategy = readConnStringGetStrategy;
} }
public Type ShardingDbContextType => typeof(TShardingDbContext); public Type ShardingDbContextType => typeof(TShardingDbContext);
@ -33,6 +34,8 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
/// 默认是否开启读写分离 /// 默认是否开启读写分离
/// </summary> /// </summary>
public bool ReadWriteSupport { get; } public bool ReadWriteSupport { get; }
public ReadStrategyEnum ReadStrategy { get; }
public ReadConnStringGetStrategyEnum ReadConnStringGetStrategy { get; } public ReadConnStringGetStrategyEnum ReadConnStringGetStrategy { get; }
} }
} }

View File

@ -16,6 +16,7 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
private readonly ConcurrentDictionary<string, IReadWriteConnector> _connectors = private readonly ConcurrentDictionary<string, IReadWriteConnector> _connectors =
new ConcurrentDictionary<string, IReadWriteConnector>(); new ConcurrentDictionary<string, IReadWriteConnector>();
private readonly IReadWriteOptions<TShardingDbContext> _readWriteOptions;
public ReadWriteShardingConnectionStringResolver(IEnumerable<IReadWriteConnector> connectors) public ReadWriteShardingConnectionStringResolver(IEnumerable<IReadWriteConnector> connectors)
{ {
var enumerator = connectors.GetEnumerator(); var enumerator = connectors.GetEnumerator();
@ -26,6 +27,7 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
_connectors.TryAdd(currentConnector.DataSourceName, currentConnector); _connectors.TryAdd(currentConnector.DataSourceName, currentConnector);
} }
_readWriteOptions = ShardingContainer.GetService<IReadWriteOptions<TShardingDbContext>>();
} }
public bool ContainsReadWriteDataSourceName(string dataSourceName) public bool ContainsReadWriteDataSourceName(string dataSourceName)
@ -43,8 +45,24 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
public bool AddConnectionString(string dataSourceName, string connectionString) public bool AddConnectionString(string dataSourceName, string connectionString)
{ {
if (!_connectors.TryGetValue(dataSourceName, out var connector)) if (!_connectors.TryGetValue(dataSourceName, out var connector))
throw new ShardingCoreInvalidOperationException($"read write connector not found, data source name:[{dataSourceName}]"); {
return connector.AddConnectionString(connectionString); if (_readWriteOptions.ReadStrategy == ReadStrategyEnum.Loop)
{
connector= new ReadWriteLoopConnector(dataSourceName, new List<string> { connectionString });
}
else if (_readWriteOptions.ReadStrategy == ReadStrategyEnum.Random)
{
connector= new ReadWriteLoopConnector(dataSourceName, new List<string> { connectionString });
}
throw new ShardingCoreInvalidOperationException(
$"unknown read write strategy:[{_readWriteOptions.ReadStrategy}]");
}
else
{
return connector.AddConnectionString(connectionString);
}
} }
} }
} }

View File

@ -51,12 +51,13 @@ namespace ShardingCore.Test
private readonly IShardingTableCreator<ShardingDefaultDbContext> _shardingTableCreator; private readonly IShardingTableCreator<ShardingDefaultDbContext> _shardingTableCreator;
private readonly IShardingReadWriteManager _shardingReadWriteManager; private readonly IShardingReadWriteManager _shardingReadWriteManager;
private readonly IRouteTailFactory _routeTailFactory; private readonly IRouteTailFactory _routeTailFactory;
private readonly IShardingConnectionStringResolver<ShardingDefaultDbContext> _shardingConnectionStringResolver;
public ShardingTest(ShardingDefaultDbContext virtualDbContext, IShardingRouteManager shardingRouteManager, IConfiguration configuration, public ShardingTest(ShardingDefaultDbContext virtualDbContext, IShardingRouteManager shardingRouteManager, IConfiguration configuration,
IEntityMetadataManager<ShardingDefaultDbContext> entityMetadataManager, IEntityMetadataManager<ShardingDefaultDbContext> entityMetadataManager,
IShardingComparer<ShardingDefaultDbContext> shardingComparer, IVirtualDataSource<ShardingDefaultDbContext> virtualDataSource, IShardingComparer<ShardingDefaultDbContext> shardingComparer, IVirtualDataSource<ShardingDefaultDbContext> virtualDataSource,
IVirtualTableManager<ShardingDefaultDbContext> virtualTableManager, IVirtualTableManager<ShardingDefaultDbContext> virtualTableManager,
IShardingTableCreator<ShardingDefaultDbContext> shardingTableCreator, IShardingReadWriteManager shardingReadWriteManager,IRouteTailFactory routeTailFactory) IShardingTableCreator<ShardingDefaultDbContext> shardingTableCreator, IShardingReadWriteManager shardingReadWriteManager,IRouteTailFactory routeTailFactory,IShardingConnectionStringResolver<ShardingDefaultDbContext> shardingConnectionStringResolver)
{ {
_virtualDbContext = virtualDbContext; _virtualDbContext = virtualDbContext;
_shardingRouteManager = shardingRouteManager; _shardingRouteManager = shardingRouteManager;
@ -69,6 +70,7 @@ namespace ShardingCore.Test
_shardingTableCreator = shardingTableCreator; _shardingTableCreator = shardingTableCreator;
_shardingReadWriteManager = shardingReadWriteManager; _shardingReadWriteManager = shardingReadWriteManager;
_routeTailFactory = routeTailFactory; _routeTailFactory = routeTailFactory;
_shardingConnectionStringResolver = shardingConnectionStringResolver;
//var dataSource = ShardingContainer.GetService<IVirtualDataSource<ShardingDefaultDbContext>>(); //var dataSource = ShardingContainer.GetService<IVirtualDataSource<ShardingDefaultDbContext>>();
//dataSource.AddPhysicDataSource(new DefaultPhysicDataSource("E", "XXXXX", false)); //dataSource.AddPhysicDataSource(new DefaultPhysicDataSource("E", "XXXXX", false));
@ -222,6 +224,9 @@ namespace ShardingCore.Test
{ new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), }); { new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), });
Assert.Equal(x1x1, x2x2); Assert.Equal(x1x1, x2x2);
Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode()); Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode());
var succeedAddConnectionString = _shardingConnectionStringResolver.AddConnectionString("A", "Data Source=localhost;Initial Catalog=ShardingCoreDBC;Integrated Security=True;");
Assert.True(succeedAddConnectionString);
} }
public class SequenceClass public class SequenceClass