[#146]完成指定读写分离读库,有助于读写分离用户的读库设置链接发布x.4.3.2

This commit is contained in:
xuejiaming 2022-05-11 09:49:50 +08:00
parent 98f570cc44
commit 58ee2a7ba3
32 changed files with 276 additions and 87 deletions

View File

@ -1,9 +1,9 @@
:start
::定义版本
set EFCORE2=2.4.3.1
set EFCORE3=3.4.3.1
set EFCORE5=5.4.3.1
set EFCORE6=6.4.3.1
set EFCORE2=2.4.3.2
set EFCORE3=3.4.3.2
set EFCORE5=5.4.3.2
set EFCORE6=6.4.3.2
::删除所有bin与obj下的文件
@echo off

View File

@ -33,7 +33,7 @@ namespace Sample.MultiConfig.Controllers
public override int Priority { get; }
public override string DefaultDataSourceName { get; }
public override string DefaultConnectionString { get; }
public override IDictionary<string, IEnumerable<string>> ReadWriteSeparationConfigs { get; }
public override IDictionary<string, ReadNode[]> ReadWriteNodeSeparationConfigs { get; }
public override ReadStrategyEnum? ReadStrategy { get; }
public override bool? ReadWriteDefaultEnable { get; }
public override int? ReadWriteDefaultPriority { get; }

View File

@ -20,6 +20,7 @@ using ShardingCore.Extensions.ShardingQueryableExtensions;
using ShardingCore.Sharding.Abstractions;
using Microsoft.EntityFrameworkCore.Query.Internal;
using ShardingCore.EFCores;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
namespace Sample.SqlServer.Controllers
{
@ -34,12 +35,14 @@ namespace Sample.SqlServer.Controllers
private readonly DefaultShardingDbContext _defaultTableDbContext;
private readonly IShardingRouteManager _shardingRouteManager;
private readonly IShardingReadWriteManager _readWriteManager;
public ValuesController(DefaultShardingDbContext defaultTableDbContext, IShardingRouteManager shardingRouteManager)
public ValuesController(DefaultShardingDbContext defaultTableDbContext, IShardingRouteManager shardingRouteManager,IShardingReadWriteManager readWriteManager)
{
_defaultTableDbContext = defaultTableDbContext;
_ = defaultTableDbContext.Model;
_shardingRouteManager = shardingRouteManager;
_readWriteManager = readWriteManager;
}
[HttpGet]
@ -364,5 +367,19 @@ namespace Sample.SqlServer.Controllers
return Ok(new { xxx, xxx1 });
}
[HttpGet]
public async Task<IActionResult> Get5(string readNodeName)
{
using (_readWriteManager.CreateScope<DefaultShardingDbContext>())
{
_readWriteManager.GetCurrent<DefaultShardingDbContext>().SetReadWriteSeparation(100,true);
_readWriteManager.GetCurrent<DefaultShardingDbContext>().AddDataSourceReadNode("A", readNodeName);
var xxxaaa = await _defaultTableDbContext.Set<SysUserSalary>().FirstOrDefaultAsync();
}
return Ok();
}
}
}

View File

@ -10,9 +10,11 @@ using Sample.SqlServer.UnionAllMerge;
using ShardingCore;
using ShardingCore.TableExists;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using Microsoft.Extensions.DependencyInjection.Extensions;
using ShardingCore.Core.DbContextCreator;
using ShardingCore.Sharding.ReadWriteConfigurations;
using ShardingCore.Sharding.ShardingComparision;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
@ -55,6 +57,24 @@ namespace Sample.SqlServer
op.AddDefaultDataSource("A",
"Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"
);
op.AddReadWriteNodeSeparation(sp =>new Dictionary<string, IEnumerable<ReadNode>>()
{
{"A",new List<ReadNode>()
{
new ReadNode("A1","Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"),
new ReadNode("A2","Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"),
new ReadNode("A3","Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"),
new ReadNode("A4","Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"),
new ReadNode("A5","Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"),
new ReadNode("A6","Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"),
new ReadNode("A1","Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"),
new ReadNode("A1","Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"),
new ReadNode("A1","Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"),
new ReadNode("A1","Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"),
new ReadNode("A1","Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;"),
new ReadNode("X","Data Source=localhost;Initial Catalog=ShardingCoreDBXA123;Integrated Security=True;"),
}}
},ReadStrategyEnum.Loop);
}).EnsureConfig();
//services.AddShardingDbContext<DefaultShardingDbContext1>(
// (conn, o) =>

View File

@ -85,6 +85,20 @@ namespace ShardingCore.Core.ShardingConfigurations
ShardingReadWriteSeparationOptions.DefaultPriority= defaultPriority;
ShardingReadWriteSeparationOptions.ReadConnStringGetStrategy= readConnStringGetStrategy;
}
public void AddReadWriteNodeSeparation(
Func<IServiceProvider, IDictionary<string, IEnumerable<ReadNode>>> readWriteNodeSeparationConfigure,
ReadStrategyEnum readStrategyEnum,
bool defaultEnable = false,
int defaultPriority = 10,
ReadConnStringGetStrategyEnum readConnStringGetStrategy = ReadConnStringGetStrategyEnum.LatestFirstTime)
{
ShardingReadWriteSeparationOptions = new ShardingReadWriteSeparationOptions();
ShardingReadWriteSeparationOptions.ReadWriteNodeSeparationConfigure= readWriteNodeSeparationConfigure ?? throw new ArgumentNullException(nameof(readWriteNodeSeparationConfigure));
ShardingReadWriteSeparationOptions.ReadStrategy = readStrategyEnum;
ShardingReadWriteSeparationOptions.DefaultEnable=defaultEnable;
ShardingReadWriteSeparationOptions.DefaultPriority= defaultPriority;
ShardingReadWriteSeparationOptions.ReadConnStringGetStrategy= readConnStringGetStrategy;
}
/// <summary>
/// 多个DbContext事务传播委托

View File

@ -7,6 +7,7 @@ namespace ShardingCore.Core.ShardingConfigurations
public class ShardingReadWriteSeparationOptions
{
public Func<IServiceProvider, IDictionary<string, IEnumerable<string>>> ReadWriteSeparationConfigure { get; set; }
public Func<IServiceProvider, IDictionary<string, IEnumerable<ReadNode>>> ReadWriteNodeSeparationConfigure { get; set; }
public ReadStrategyEnum ReadStrategy { get; set; } = ReadStrategyEnum.Loop;
public bool DefaultEnable { get; set; } = false;

View File

@ -28,7 +28,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions
public abstract string DefaultDataSourceName { get; }
public abstract string DefaultConnectionString { get; }
public virtual IDictionary<string, string> ExtraDataSources { get; }=new ConcurrentDictionary<string, string>();
public virtual IDictionary<string, IEnumerable<string>> ReadWriteSeparationConfigs { get; }
public virtual IDictionary<string, ReadNode[]> ReadWriteNodeSeparationConfigs { get; }
public virtual ReadStrategyEnum? ReadStrategy { get; }
public virtual bool? ReadWriteDefaultEnable { get; }
public virtual int? ReadWriteDefaultPriority { get; }
@ -52,7 +52,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions
public virtual bool UseReadWriteSeparation()
{
return ReadWriteSeparationConfigs!=null;
return ReadWriteNodeSeparationConfigs!=null;
}
}

View File

@ -42,7 +42,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions
/// <summary>
/// null表示不启用读写分离,if null mean not enable read write
/// </summary>
IDictionary<string, IEnumerable<string>> ReadWriteSeparationConfigs { get; }
IDictionary<string, ReadNode[]> ReadWriteNodeSeparationConfigs { get; }
ReadStrategyEnum? ReadStrategy { get; }
bool? ReadWriteDefaultEnable { get; }

View File

@ -11,6 +11,7 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core.ShardingConfigurations.Abstractions;
@ -28,7 +29,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
public override string DefaultDataSourceName { get; }
public override string DefaultConnectionString { get; }
public override IDictionary<string, string> ExtraDataSources { get; }
public override IDictionary<string, IEnumerable<string>> ReadWriteSeparationConfigs { get; }
public override IDictionary<string, ReadNode[]> ReadWriteNodeSeparationConfigs { get; }
public override ReadStrategyEnum? ReadStrategy { get; }
public override bool? ReadWriteDefaultEnable { get; }
public override int? ReadWriteDefaultPriority { get; }
@ -53,7 +54,23 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
new EmptyTableEnsureManager<TShardingDbContext>();
if (options.ShardingReadWriteSeparationOptions != null)
{
ReadWriteSeparationConfigs = options.ShardingReadWriteSeparationOptions.ReadWriteSeparationConfigure?.Invoke(serviceProvider);
if (options.ShardingReadWriteSeparationOptions.ReadWriteNodeSeparationConfigure != null)
{
var readConfig = options.ShardingReadWriteSeparationOptions.ReadWriteNodeSeparationConfigure?.Invoke(serviceProvider);
if (readConfig != null)
{
ReadWriteNodeSeparationConfigs = readConfig.ToDictionary(kv=>kv.Key,kv=>kv.Value.ToArray());
}
}
else
{
var nodeConfig = options.ShardingReadWriteSeparationOptions.ReadWriteSeparationConfigure?.Invoke(serviceProvider);
if (nodeConfig != null)
{
ReadWriteNodeSeparationConfigs = nodeConfig.ToDictionary(kv => kv.Key,
kv => kv.Value.Select(o => new ReadNode(Guid.NewGuid().ToString("n"), o)).ToArray());
}
}
ReadStrategy = options.ShardingReadWriteSeparationOptions.ReadStrategy;
ReadWriteDefaultEnable = options.ShardingReadWriteSeparationOptions.DefaultEnable;
ReadWriteDefaultPriority = options.ShardingReadWriteSeparationOptions.DefaultPriority;

View File

@ -61,14 +61,15 @@ namespace ShardingCore.DynamicDataSources
/// <param name="virtualDataSource"></param>
/// <param name="dataSourceName"></param>
/// <param name="connectionString"></param>
/// <param name="readNodeName"></param>
/// <exception cref="ShardingCoreInvalidOperationException"></exception>
public static void DynamicAppendReadWriteConnectionString<TShardingDbContext>(IVirtualDataSource<TShardingDbContext> virtualDataSource, string dataSourceName,
string connectionString) where TShardingDbContext : DbContext, IShardingDbContext
string connectionString,string readNodeName = null) where TShardingDbContext : DbContext, IShardingDbContext
{
if (virtualDataSource.ConnectionStringManager is IReadWriteAppendConnectionString
if (virtualDataSource.ConnectionStringManager is IReadWriteConnectionStringManager
readWriteAppendConnectionString)
{
readWriteAppendConnectionString.AddReadConnectionString(dataSourceName, connectionString);
readWriteAppendConnectionString.AddReadConnectionString(dataSourceName, connectionString, readNodeName);
return;
}
@ -82,12 +83,13 @@ namespace ShardingCore.DynamicDataSources
/// <param name="configId"></param>
/// <param name="dataSourceName"></param>
/// <param name="connectionString"></param>
/// <param name="readNodeName"></param>
public static void DynamicAppendReadWriteConnectionString<TShardingDbContext>(string configId, string dataSourceName,
string connectionString) where TShardingDbContext : DbContext, IShardingDbContext
string connectionString, string readNodeName=null) where TShardingDbContext : DbContext, IShardingDbContext
{
var virtualDataSourceManager = ShardingContainer.GetRequiredVirtualDataSourceManager<TShardingDbContext>();
var virtualDataSource = virtualDataSourceManager.GetVirtualDataSource(configId);
DynamicAppendReadWriteConnectionString(virtualDataSource, dataSourceName, connectionString);
DynamicAppendReadWriteConnectionString(virtualDataSource, dataSourceName, connectionString,readNodeName);
}
}
}

View File

@ -82,14 +82,15 @@ namespace ShardingCore.Helpers
/// <param name="virtualDataSource"></param>
/// <param name="dataSourceName"></param>
/// <param name="connectionString"></param>
/// <param name="readNodeName"></param>
/// <exception cref="ShardingCoreInvalidOperationException"></exception>
public static void DynamicAppendReadWriteConnectionString<TShardingDbContext>(IVirtualDataSource<TShardingDbContext> virtualDataSource, string dataSourceName,
string connectionString) where TShardingDbContext : DbContext, IShardingDbContext
string connectionString, string readNodeName=null) where TShardingDbContext : DbContext, IShardingDbContext
{
if (virtualDataSource.ConnectionStringManager is IReadWriteAppendConnectionString
if (virtualDataSource.ConnectionStringManager is IReadWriteConnectionStringManager
readWriteAppendConnectionString)
{
readWriteAppendConnectionString.AddReadConnectionString(dataSourceName, connectionString);
readWriteAppendConnectionString.AddReadConnectionString(dataSourceName, connectionString, readNodeName);
return;
}
@ -103,12 +104,13 @@ namespace ShardingCore.Helpers
/// <param name="configId"></param>
/// <param name="dataSourceName"></param>
/// <param name="connectionString"></param>
/// <param name="readNodeName"></param>
public static void DynamicAppendReadWriteConnectionString<TShardingDbContext>(string configId, string dataSourceName,
string connectionString) where TShardingDbContext : DbContext, IShardingDbContext
string connectionString, string readNodeName = null) where TShardingDbContext : DbContext, IShardingDbContext
{
var virtualDataSourceManager = ShardingContainer.GetRequiredVirtualDataSourceManager<TShardingDbContext>();
var virtualDataSource = virtualDataSourceManager.GetVirtualDataSource(configId);
DynamicAppendReadWriteConnectionString(virtualDataSource, dataSourceName, connectionString);
DynamicAppendReadWriteConnectionString(virtualDataSource, dataSourceName, connectionString, readNodeName);
}
}
}

View File

@ -65,35 +65,50 @@ namespace ShardingCore.Sharding
private string GetReadWriteSeparationConnectString(string dataSourceName)
{
var support = ReadWriteSeparation;
string readNodeName = null;
var hasConfig = false;
var shardingReadWriteContext = _shardingReadWriteManager.GetCurrent<TShardingDbContext>();
if (shardingReadWriteContext != null)
{
support = (ReadWriteSeparationPriority >= shardingReadWriteContext.DefaultPriority)
var dbFirst = ReadWriteSeparationPriority >= shardingReadWriteContext.DefaultPriority;
support = dbFirst
? ReadWriteSeparation
: shardingReadWriteContext.DefaultReadEnable;
if (!dbFirst&& support)
{
hasConfig = shardingReadWriteContext.TryGetDataSourceReadNode(dataSourceName, out readNodeName);
}
}
if (support)
{
return GetReadWriteSeparationConnectString0(dataSourceName);
return GetReadWriteSeparationConnectString0(dataSourceName, hasConfig?readNodeName:null);
}
return GetWriteConnectionString(dataSourceName);
}
private string GetReadWriteSeparationConnectString0(string dataSourceName)
private string GetReadWriteSeparationConnectString0(string dataSourceName,string readNodeName)
{
if (ReadConnStringGetStrategy == ReadConnStringGetStrategyEnum.LatestFirstTime)
if (_virtualDataSource.ConnectionStringManager is IReadWriteConnectionStringManager
readWriteConnectionStringManager)
{
if (_cacheConnectionString == null)
_cacheConnectionString = _virtualDataSource.ConnectionStringManager.GetConnectionString(dataSourceName);
return _cacheConnectionString;
}
else if (ReadConnStringGetStrategy == ReadConnStringGetStrategyEnum.LatestEveryTime)
{
return _virtualDataSource.ConnectionStringManager.GetConnectionString(dataSourceName);
if (ReadConnStringGetStrategy == ReadConnStringGetStrategyEnum.LatestFirstTime)
{
if (_cacheConnectionString == null)
_cacheConnectionString = readWriteConnectionStringManager.GetReadNodeConnectionString(dataSourceName, readNodeName);
return _cacheConnectionString;
}
else if (ReadConnStringGetStrategy == ReadConnStringGetStrategyEnum.LatestEveryTime)
{
return readWriteConnectionStringManager.GetReadNodeConnectionString(dataSourceName,readNodeName);
}
else
{
throw new ShardingCoreInvalidOperationException($"ReadWriteConnectionStringManager ReadConnStringGetStrategy:{ReadConnStringGetStrategy}");
}
}
else
{
throw new ShardingCoreInvalidOperationException($"ReadWriteConnectionStringManager ReadConnStringGetStrategy:{ReadConnStringGetStrategy}");
throw new ShardingCoreInvalidOperationException($"virtual data source connection string manager is not [{nameof(IReadWriteConnectionStringManager)}]");
}
}

View File

@ -6,8 +6,9 @@ using System.Threading.Tasks;
namespace ShardingCore.Sharding.ReadWriteConfigurations.Abstractions
{
public interface IReadWriteAppendConnectionString
public interface IReadWriteConnectionStringManager
{
bool AddReadConnectionString(string dataSourceName,string connectionString);
string GetReadNodeConnectionString(string dataSourceName,string readNodeName);
bool AddReadConnectionString(string dataSourceName,string connectionString, string readNodeName);
}
}

View File

@ -15,13 +15,15 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations.Abstractions
/// <summary>
/// 获取链接字符串
/// </summary>
/// <param name="readNodeName">可为null</param>
/// <returns></returns>
public string GetConnectionString();
public string GetConnectionString(string readNodeName);
/// <summary>
/// 添加链接字符串
/// </summary>
/// <param name="connectionString"></param>
/// <param name="readNodeName"></param>
/// <returns></returns>
public bool AddConnectionString(string connectionString);
public bool AddConnectionString(string connectionString, string readNodeName);
}
}

View File

@ -11,6 +11,6 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations.Abstractions
public interface IReadWriteConnectorFactory
{
IReadWriteConnector CreateConnector(ReadStrategyEnum strategy, string dataSourceName,
IEnumerable<string> connectionStrings);
ReadNode[] readNodes);
}
}

View File

@ -19,13 +19,20 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations.Abstractions
public interface IShardingConnectionStringResolver
{
bool ContainsReadWriteDataSourceName(string dataSourceName);
string GetConnectionString(string dataSourceName);
/// <summary>
/// 获取指定数据源的读连接名称节点
/// </summary>
/// <param name="dataSourceName"></param>
/// <param name="readNodeName">名称不存在报错,如果为null那么就随机获取</param>
/// <returns></returns>
string GetConnectionString(string dataSourceName,string readNodeName);
/// <summary>
/// 添加数据源从库读字符串
/// </summary>
/// <param name="dataSourceName"></param>
/// <param name="connectionString"></param>
/// <param name="readNodeName"></param>
/// <returns></returns>
bool AddConnectionString(string dataSourceName, string connectionString);
bool AddConnectionString(string dataSourceName, string connectionString, string readNodeName);
}
}

View File

@ -12,43 +12,45 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations.Connectors.Abstractions
{
public abstract class AbstractionReadWriteConnector:IReadWriteConnector
{
protected List<string> ConnectionStrings { get;}
protected List<ReadNode> ReadNodes { get;}
protected int Length { get; private set; }
private object slock = new object();
//private readonly string _tempConnectionString;
//private readonly OneByOneChecker _oneByOneChecker = new OneByOneChecker();
public AbstractionReadWriteConnector(string dataSourceName,IEnumerable<string> connectionStrings)
public AbstractionReadWriteConnector(string dataSourceName,ReadNode[] readNodes)
{
DataSourceName = dataSourceName;
ConnectionStrings = connectionStrings.ToList();
Length = ConnectionStrings.Count;
ReadNodes = readNodes.ToList();
Length = ReadNodes.Count;
//_tempConnectionString = ConnectionStrings[0];
}
public string DataSourceName { get; }
public string GetConnectionString()
public string GetConnectionString(string readNodeName)
{
return DoGetConnectionString();
return DoGetConnectionString(readNodeName);
}
public abstract string DoGetConnectionString();
public abstract string DoGetConnectionString(string readNodeName);
/// <summary>
/// 动态添加数据源
/// </summary>
/// <param name="connectionString"></param>
/// <param name="readNodeName"></param>
/// <returns></returns>
public bool AddConnectionString(string connectionString)
public bool AddConnectionString(string connectionString, string readNodeName)
{
var acquired = Monitor.TryEnter(slock,TimeSpan.FromSeconds(3));
var acquired = Monitor.TryEnter(slock, TimeSpan.FromSeconds(3));
if (!acquired)
throw new ShardingCoreInvalidOperationException($"{nameof(AddConnectionString)} is busy");
try
{
ConnectionStrings.Add(connectionString);
Length = ConnectionStrings.Count;
ReadNodes.Add(new ReadNode(readNodeName?? Guid.NewGuid().ToString("n"), connectionString));
Length = ReadNodes.Count;
return true;
}
finally

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.ReadWriteConfigurations.Connectors.Abstractions;
namespace ShardingCore.Sharding.ReadWriteConfigurations
@ -17,19 +18,32 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
public class ReadWriteLoopConnector: AbstractionReadWriteConnector
{
private long _seed = 0;
public ReadWriteLoopConnector(string dataSourceName, IEnumerable<string> connectionStrings):base(dataSourceName,connectionStrings)
public ReadWriteLoopConnector(string dataSourceName, ReadNode[] readNodes) :base(dataSourceName, readNodes)
{
}
public override string DoGetConnectionString()
private string DoGetNoReadNameConnectionString()
{
if (Length == 1)
return ConnectionStrings[0];
return ReadNodes[0].ConnectionString;
var newValue = Interlocked.Increment(ref _seed);
var next = (int)(newValue % Length);
if (next < 0)
return ConnectionStrings[Math.Abs(next)];
return ConnectionStrings[next];
return ReadNodes[Math.Abs(next)].ConnectionString;
return ReadNodes[next].ConnectionString;
}
public override string DoGetConnectionString(string readNodeName)
{
if (readNodeName == null)
{
return DoGetNoReadNameConnectionString();
}
else
{
return ReadNodes.FirstOrDefault(o => o.Name == readNodeName)?.ConnectionString ??
throw new ShardingCoreInvalidOperationException($"read node name :[{readNodeName}] not found");
}
}
}
}

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using ShardingCore.Exceptions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.ReadWriteConfigurations.Connectors.Abstractions;
@ -16,17 +17,28 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
*/
public class ReadWriteRandomConnector:AbstractionReadWriteConnector
{
public ReadWriteRandomConnector(string dataSourceName,IEnumerable<string> connectionStrings):base(dataSourceName,connectionStrings)
public ReadWriteRandomConnector(string dataSourceName,ReadNode[] readNodes):base(dataSourceName, readNodes)
{
}
public override string DoGetConnectionString()
private string DoGetNoReadNameConnectionString()
{
if (Length == 1)
return ConnectionStrings[0];
return ReadNodes[0].ConnectionString;
var next = RandomHelper.Next(0, Length);
return ConnectionStrings[next];
return ReadNodes[next].ConnectionString;
}
public override string DoGetConnectionString(string readNodeName)
{
if (readNodeName == null)
{
return DoGetNoReadNameConnectionString();
}else
{
return ReadNodes.FirstOrDefault(o => o.Name == readNodeName)?.ConnectionString ??
throw new ShardingCoreInvalidOperationException($"read node name :[{readNodeName}] not found");
}
}
}
}

View File

@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.ReadWriteConfigurations
{
/// <summary>
///
/// </summary>
/// Author: xjm
/// Created: 2022/5/11 8:31:38
/// Email: 326308290@qq.com
public class ReadNode
{
/// <summary>
/// 当前读库节点名称
/// </summary>
public string Name { get; }
/// <summary>
/// 当前读库链接的连接字符串
/// </summary>
public string ConnectionString { get; }
public ReadNode(string name,string connectionString)
{
Name = name??throw new ArgumentNullException("read node name is null");
ConnectionString = connectionString;
}
}
}

View File

@ -16,7 +16,7 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ReadWriteConnectionStringManager: IConnectionStringManager, IReadWriteAppendConnectionString
public class ReadWriteConnectionStringManager: IConnectionStringManager, IReadWriteConnectionStringManager
{
private IShardingConnectionStringResolver _shardingConnectionStringResolver;
private readonly IVirtualDataSource _virtualDataSource;
@ -26,20 +26,25 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
{
_virtualDataSource = virtualDataSource;
var readWriteConnectorFactory = ShardingContainer.GetService<IReadWriteConnectorFactory>();
var readWriteConnectors = virtualDataSource.ConfigurationParams.ReadWriteSeparationConfigs.Select(o=> readWriteConnectorFactory.CreateConnector(virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key,o.Value));
var readWriteConnectors = virtualDataSource.ConfigurationParams.ReadWriteNodeSeparationConfigs.Select(o=> readWriteConnectorFactory.CreateConnector(virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key,o.Value));
_shardingConnectionStringResolver = new ReadWriteShardingConnectionStringResolver(readWriteConnectors, virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault());
}
public string GetConnectionString(string dataSourceName)
{
if (!_shardingConnectionStringResolver.ContainsReadWriteDataSourceName(dataSourceName))
return _virtualDataSource.GetConnectionString(dataSourceName);
return _shardingConnectionStringResolver.GetConnectionString(dataSourceName);
return GetReadNodeConnectionString(dataSourceName,null);
}
public bool AddReadConnectionString(string dataSourceName, string connectionString)
public string GetReadNodeConnectionString(string dataSourceName, string readNodeName)
{
return _shardingConnectionStringResolver.AddConnectionString(dataSourceName, connectionString);
if (!_shardingConnectionStringResolver.ContainsReadWriteDataSourceName(dataSourceName))
return _virtualDataSource.GetConnectionString(dataSourceName);
return _shardingConnectionStringResolver.GetConnectionString(dataSourceName, readNodeName);
}
public bool AddReadConnectionString(string dataSourceName, string connectionString, string readNodeName)
{
return _shardingConnectionStringResolver.AddConnectionString(dataSourceName, connectionString, readNodeName);
}
}
}

View File

@ -12,16 +12,16 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
{
public class ReadWriteConnectorFactory: IReadWriteConnectorFactory
{
public IReadWriteConnector CreateConnector(ReadStrategyEnum strategy,string dataSourceName, IEnumerable<string> connectionStrings)
public IReadWriteConnector CreateConnector(ReadStrategyEnum strategy,string dataSourceName, ReadNode[] readNodes)
{
if (strategy == ReadStrategyEnum.Loop)
{
return new ReadWriteLoopConnector(dataSourceName, connectionStrings);
return new ReadWriteLoopConnector(dataSourceName, readNodes);
}
else if (strategy == ReadStrategyEnum.Random)
{
return new ReadWriteRandomConnector(dataSourceName, connectionStrings);
return new ReadWriteRandomConnector(dataSourceName, readNodes);
}
else
{

View File

@ -35,27 +35,26 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
return _connectors.ContainsKey(dataSourceName);
}
public string GetConnectionString(string dataSourceName)
public string GetConnectionString(string dataSourceName, string readNodeName)
{
if (!_connectors.TryGetValue(dataSourceName, out var connector))
throw new ShardingCoreInvalidOperationException($"read write connector not found, data source name:[{dataSourceName}]");
return connector.GetConnectionString();
return connector.GetConnectionString(readNodeName);
}
public bool AddConnectionString(string dataSourceName, string connectionString)
public bool AddConnectionString(string dataSourceName, string connectionString, string readNodeName)
{
if (!_connectors.TryGetValue(dataSourceName, out var connector))
{
connector = _readWriteConnectorFactory.CreateConnector(_readStrategy,
dataSourceName, new List<string>()
dataSourceName, new ReadNode[]
{
connectionString
new ReadNode(readNodeName??Guid.NewGuid().ToString("n"),connectionString)
});
return _connectors.TryAdd(dataSourceName, connector);
}
else
{
return connector.AddConnectionString(connectionString);
return connector.AddConnectionString(connectionString, readNodeName);
}
}
}

View File

@ -15,16 +15,43 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
{
public bool DefaultReadEnable { get; set; }
public int DefaultPriority { get; set; }
private readonly Dictionary<string /*数据源*/, string /*数据源对应的读节点名称*/> _dataSourceReadNode;
private ShardingReadWriteContext()
{
DefaultReadEnable = false;
DefaultPriority = 0;
_dataSourceReadNode = new Dictionary<string, string>();
}
public static ShardingReadWriteContext Create()
{
return new ShardingReadWriteContext();
}
/// <summary>
/// 添加数据源对应读节点获取名称
/// </summary>
/// <param name="dataSource"></param>
/// <param name="readNodeName"></param>
/// <returns></returns>
public bool AddDataSourceReadNode(string dataSource, string readNodeName)
{
if (_dataSourceReadNode.ContainsKey(dataSource))
{
return false;
}
_dataSourceReadNode.Add(dataSource, readNodeName);
return true;
}
/// <summary>
/// 尝试获取对应数据源的读节点名称
/// </summary>
/// <param name="dataSource"></param>
/// <param name="readNodeName"></param>
/// <returns></returns>
public bool TryGetDataSourceReadNode(string dataSource,out string readNodeName)
{
return _dataSourceReadNode.TryGetValue(dataSource, out readNodeName);
}
}
}

View File

@ -79,7 +79,7 @@ namespace ShardingCore.Test
_routeTailFactory = routeTailFactory;
_readWriteConnectorFactory = readWriteConnectorFactory;
_tableRouteRuleEngineFactory = tableRouteRuleEngineFactory;
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteNodeSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
_shardingConnectionStringResolver = new ReadWriteShardingConnectionStringResolver(readWriteConnectors, _virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault());
}
[Fact]
@ -229,9 +229,9 @@ namespace ShardingCore.Test
{ new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), });
Assert.Equal(x1x1, x2x2);
Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode());
var succeedAddConnectionString = _shardingConnectionStringResolver.AddConnectionString("X", "Data Source=localhost;Initial Catalog=ShardingCoreDBC;Integrated Security=True;");
var succeedAddConnectionString = _shardingConnectionStringResolver.AddConnectionString("X", "Data Source=localhost;Initial Catalog=ShardingCoreDBC;Integrated Security=True;",null);
Assert.True(succeedAddConnectionString);
var connectionString = _shardingConnectionStringResolver.GetConnectionString("X");
var connectionString = _shardingConnectionStringResolver.GetConnectionString("X", null);
Assert.Equal("Data Source=localhost;Initial Catalog=ShardingCoreDBC;Integrated Security=True;", connectionString);
}

View File

@ -66,7 +66,7 @@ namespace ShardingCore.Test
_shardingReadWriteManager = shardingReadWriteManager;
_routeTailFactory = routeTailFactory;
_readWriteConnectorFactory = readWriteConnectorFactory;
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteNodeSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
_shardingConnectionStringResolver = new ReadWriteShardingConnectionStringResolver(readWriteConnectors, _virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault());
}
[Fact]

View File

@ -73,7 +73,7 @@ namespace ShardingCore.Test2x
_shardingReadWriteManager = shardingReadWriteManager;
_routeTailFactory = routeTailFactory;
_readWriteConnectorFactory = readWriteConnectorFactory;
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteNodeSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
_shardingConnectionStringResolver = new ReadWriteShardingConnectionStringResolver(readWriteConnectors, _virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault());
}
[Fact]

View File

@ -66,7 +66,7 @@ namespace ShardingCore.Test2x
_shardingReadWriteManager = shardingReadWriteManager;
_routeTailFactory = routeTailFactory;
_readWriteConnectorFactory = readWriteConnectorFactory;
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteNodeSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
_shardingConnectionStringResolver = new ReadWriteShardingConnectionStringResolver(readWriteConnectors, _virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault());
}
[Fact]

View File

@ -73,7 +73,7 @@ namespace ShardingCore.Test3x
_shardingReadWriteManager = shardingReadWriteManager;
_routeTailFactory = routeTailFactory;
_readWriteConnectorFactory = readWriteConnectorFactory;
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteNodeSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
_shardingConnectionStringResolver = new ReadWriteShardingConnectionStringResolver(readWriteConnectors, _virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault());
}
[Fact]

View File

@ -67,7 +67,7 @@ namespace ShardingCore.Test3x
_shardingReadWriteManager = shardingReadWriteManager;
_routeTailFactory = routeTailFactory;
_readWriteConnectorFactory = readWriteConnectorFactory;
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteNodeSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
_shardingConnectionStringResolver = new ReadWriteShardingConnectionStringResolver(readWriteConnectors, _virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault());
}
[Fact]

View File

@ -73,7 +73,7 @@ namespace ShardingCore.Test5x
_shardingReadWriteManager = shardingReadWriteManager;
_routeTailFactory = routeTailFactory;
_readWriteConnectorFactory = readWriteConnectorFactory;
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteNodeSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
_shardingConnectionStringResolver = new ReadWriteShardingConnectionStringResolver(readWriteConnectors, _virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault());
}
[Fact]

View File

@ -67,7 +67,7 @@ namespace ShardingCore.Test5x
_shardingReadWriteManager = shardingReadWriteManager;
_routeTailFactory = routeTailFactory;
_readWriteConnectorFactory = readWriteConnectorFactory;
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
var readWriteConnectors = _virtualDataSource.ConfigurationParams.ReadWriteNodeSeparationConfigs.Select(o => readWriteConnectorFactory.CreateConnector(_virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key, o.Value));
_shardingConnectionStringResolver = new ReadWriteShardingConnectionStringResolver(readWriteConnectors, _virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault());
}