优化代码支持每个分片上下文独立使用自己的日志

This commit is contained in:
xuejiaming 2022-07-06 10:39:28 +08:00
parent 65b201c9fc
commit 7148d7c5b7
31 changed files with 191 additions and 205 deletions

View File

@ -21,12 +21,12 @@ builder.Services.AddControllers();
builder.Services.AddShardingDbContext<DefaultDbContext>()
.AddEntityConfig(o =>
{
o.ThrowIfQueryRouteNotMatch = false;
// o.AddShardingTableRoute<OrderByHourRoute>();
// o.AddShardingTableRoute<AreaDeviceRoute>();
})
.AddConfig(o =>
{
o.ThrowIfQueryRouteNotMatch = false;
o.AddDefaultDataSource("ds0", "server=127.0.0.1;port=3306;database=shardingTest;userid=root;password=root;");
o.UseShardingQuery((conn, b) =>
{

View File

@ -35,13 +35,13 @@ namespace Sample.SqlServer
services.AddShardingDbContext<DefaultShardingDbContext>()
.UseRouteConfig(o =>
{
o.ThrowIfQueryRouteNotMatch = false;
o.AddShardingTableRoute<SysUserModVirtualTableRoute>();
o.AddShardingTableRoute<SysUserSalaryVirtualTableRoute>();
o.AddShardingTableRoute<TestYearShardingVirtualTableRoute>();
})
.UseConfig((sp,op) =>
{
op.ThrowIfQueryRouteNotMatch = false;
op.MaxQueryConnectionsLimit = 5;
op.UseSqlServer(builder =>
{

View File

@ -73,13 +73,13 @@ namespace Sample.SqlServerShardingTable
// },ReadStrategyEnum.Loop,defaultEnable:true).End();
services.AddShardingDbContext<MyDbContext>().AddEntityConfig(op =>
{
//当无法获取路由时会返回默认值而不是报错
op.ThrowIfQueryRouteNotMatch = false;
op.AddShardingTableRoute<SysUserVirtualTableRoute>();
op.AddShardingTableRoute<OrderVirtualTableRoute>();
op.AddShardingTableRoute<MultiShardingOrderVirtualTableRoute>();
}).AddConfig(op =>
{
//当无法获取路由时会返回默认值而不是报错
op.ThrowIfQueryRouteNotMatch = false;
op.UseShardingQuery((conStr, builder) =>
{
builder.UseSqlServer(conStr).UseLoggerFactory(efLogger);

View File

@ -31,7 +31,6 @@ namespace ShardingCore.Bootstrappers
/// <typeparam name="TEntity"></typeparam>
public class EntityMetadataInitializer<TEntity>: IEntityMetadataInitializer where TEntity:class
{
private static readonly ILogger<EntityMetadataInitializer<TEntity>> _logger=ShardingLoggerFactory.CreateLogger<EntityMetadataInitializer<TEntity>>();
private readonly Type _shardingEntityType;
private readonly IShardingProvider _shardingProvider;
private readonly IShardingRouteConfigOptions _shardingRouteConfigOptions;

View File

@ -7,6 +7,7 @@ using ShardingCore.Core.QueryRouteManagers.Abstractions;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.Core.ServiceProviders;
using ShardingCore.Core.ShardingConfigurations;
using ShardingCore.Core.ShardingConfigurations.Abstractions;
using ShardingCore.Core.ShardingMigrations.Abstractions;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.TrackerManagers;
@ -28,6 +29,7 @@ namespace ShardingCore.Core.RuntimeContexts
{
IShardingProvider GetShardingProvider();
ShardingConfigOptions GetShardingConfigOptions();
IShardingRouteConfigOptions GetShardingRouteConfigOptions();
IShardingMigrationManager GetShardingMigrationManager();
IShardingComparer GetShardingComparer();
IShardingCompilerExecutor GetShardingCompilerExecutor();
@ -53,9 +55,6 @@ namespace ShardingCore.Core.RuntimeContexts
void GetOrCreateShardingRuntimeModel(DbContext dbContext);
void UseLogfactory(ILoggerFactory loggerFactory);
void UseApplicationServiceProvider(IServiceProvider applicationServiceProvider);
void Initialize();
void AutoShardingCreate();
object GetService(Type serviceType);

View File

@ -9,6 +9,7 @@ using ShardingCore.Core.QueryRouteManagers.Abstractions;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.Core.ServiceProviders;
using ShardingCore.Core.ShardingConfigurations;
using ShardingCore.Core.ShardingConfigurations.Abstractions;
using ShardingCore.Core.ShardingMigrations.Abstractions;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.TrackerManagers;
@ -38,7 +39,7 @@ namespace ShardingCore.Core.RuntimeContexts
private IServiceCollection _serviceMap = new ServiceCollection();
private IServiceProvider _serviceProvider;
private IServiceProvider _applicationServiceProvider;
// private ILoggerFactory _applicationLoggerFactory;
public void AddServiceConfig(Action<IServiceCollection> configure)
{
@ -80,6 +81,12 @@ namespace ShardingCore.Core.RuntimeContexts
}
private IShardingRouteConfigOptions _shardingRouteConfigOptions;
public IShardingRouteConfigOptions GetShardingRouteConfigOptions()
{
return _shardingRouteConfigOptions??= GetRequiredService<IShardingRouteConfigOptions>();
}
private IShardingMigrationManager _shardingMigrationManager;
public IShardingMigrationManager GetShardingMigrationManager()
{
@ -247,16 +254,13 @@ namespace ShardingCore.Core.RuntimeContexts
}
}
}
//
// public void UseLogfactory(ILoggerFactory loggerFactory)
// {
// // ShardingLoggerFactory.DefaultFactory = loggerFactory;
// _applicationLoggerFactory = loggerFactory;
// }
public void UseLogfactory(ILoggerFactory loggerFactory)
{
ShardingLoggerFactory.DefaultFactory = loggerFactory;
}
public void UseApplicationServiceProvider(IServiceProvider applicationServiceProvider)
{
_applicationServiceProvider = applicationServiceProvider;
}
private void CheckIfBuild()
{
@ -312,6 +316,7 @@ namespace ShardingCore.Core.RuntimeContexts
{
GetShardingProvider();
GetShardingConfigOptions();
GetShardingRouteConfigOptions();
GetShardingMigrationManager();
GetShardingComparer();
GetShardingCompilerExecutor();

View File

@ -10,39 +10,6 @@ namespace ShardingCore.Core.ShardingConfigurations.Abstractions
{
public interface IShardingRouteConfigOptions
{
/// <summary>
/// 当查询遇到没有路由被命中时是否抛出错误
/// </summary>
bool ThrowIfQueryRouteNotMatch { get; set; }
// /// <summary>
// /// 如果数据库不存在就创建并且创建表除了分表的
// /// </summary>
// bool EnsureCreatedWithOutShardingTable { get; set; }
//
// /// <summary>
// /// 是否需要在启动时创建分表
// /// </summary>
// bool? CreateShardingTableOnStart { get; set; }
// /// <summary>
// /// 是否在启动时创建数据库
// /// </summary>
// public bool? CreateDataBaseOnlyOnStart { get; set; }
/// <summary>
/// 忽略建表时的错误
/// </summary>
bool? IgnoreCreateTableError { get; set; }
// ///// <summary>
// ///// 是否启用分表路由编译缓存(默认只缓存单个操作的也就是<![CDATA[=,>,>=,<,<=]]>)
// ///// default cache single filter route expression, <![CDATA[=,>,>=,<,<=]]> with sharding property
// ///// </summary>
// //bool? EnableTableRouteCompileCache { get; set; }
// ///// <summary>
// ///// 是否启用分库路由编译缓存(默认只缓存单个操作的也就是<![CDATA[=,>,>=,<,<=]]>)
// ///// default cache single filter route expression, <![CDATA[=,>,>=,<,<=]]> with sharding property
// ///// </summary>
// //bool? EnableDataSourceRouteCompileCache { get; set; }
/// <summary>
/// 添加分库路由
/// </summary>

View File

@ -9,6 +9,15 @@ namespace ShardingCore.Core.ShardingConfigurations
{
public class ShardingConfigOptions
{
/// <summary>
/// 当查询遇到没有路由被命中时是否抛出错误
/// </summary>
public bool ThrowIfQueryRouteNotMatch { get; set; } = true;
/// <summary>
/// 忽略建表时的错误
/// </summary>
public bool? IgnoreCreateTableError { get; set; } = false;
/// <summary>
/// 配置全局迁移最大并行数,以data source为一个单元并行迁移保证在多数据库分库情况下可以大大提高性能
/// </summary>

View File

@ -21,37 +21,6 @@ namespace ShardingCore.Core.ShardingConfigurations
private readonly IDictionary<Type, Type> _virtualTableRoutes = new Dictionary<Type, Type>();
private readonly ISet<ParallelTableGroupNode> _parallelTables = new HashSet<ParallelTableGroupNode>();
// /// <summary>
// /// 如果数据库不存在就创建并且创建表除了分表的
// /// </summary>
// public bool EnsureCreatedWithOutShardingTable { get; set; }
//
// /// <summary>
// /// 是否需要在启动时创建分表
// /// </summary>
// public bool? CreateShardingTableOnStart { get; set; }
// /// <summary>
// /// 是否在启动时创建数据库
// /// </summary>
// public bool? CreateDataBaseOnlyOnStart { get; set; }
// /// <summary>
// /// 当查询遇到没有路由被命中时是否抛出错误
// /// </summary>
// public bool ThrowIfQueryRouteNotMatch { get; set; } = true;
// ///// <summary>
// ///// 全局启用分表路由表达式缓存,仅缓存单个表达式
// ///// </summary>
// //public bool? EnableTableRouteCompileCache { get; set; }
// ///// <summary>
// ///// 全局启用分库路由表达式缓存,仅缓存单个表达式
// ///// </summary>
// //public bool? EnableDataSourceRouteCompileCache { get; set; }
/// <summary>
/// 忽略建表时的错误
/// </summary>
public bool? IgnoreCreateTableError { get; set; } = false;
public bool ThrowIfQueryRouteNotMatch { get; set; } = true;
/// <summary>
/// 添加分库路由
@ -154,28 +123,5 @@ namespace ShardingCore.Core.ShardingConfigurations
{
return _parallelTables;
}
// /// <summary>
// /// 仅内部DbContext生效的配置委托
// /// </summary>
// public Action<DbContextOptionsBuilder> ExecutorDbContextConfigure { get; private set; }
// public Action<DbContextOptionsBuilder> ShellDbContextConfigure { get; private set; }
//
// /// <summary>
// /// 仅内部真实DbContext配置的方法
// /// </summary>
// /// <param name="executorDbContextConfigure"></param>
// /// <exception cref="ArgumentNullException"></exception>
// public void UseExecutorDbContextConfigure(Action<DbContextOptionsBuilder> executorDbContextConfigure)
// {
// ExecutorDbContextConfigure = executorDbContextConfigure ?? throw new ArgumentNullException(nameof(executorDbContextConfigure));
// }
//
// public void UseShellDbContextConfigure(Action<DbContextOptionsBuilder> shellDbContextConfigure)
// {
// ShellDbContextConfigure = shellDbContextConfigure ?? throw new ArgumentNullException(nameof(shellDbContextConfigure));
// }
}
}

View File

@ -44,6 +44,15 @@ namespace ShardingCore.Core.VirtualRoutes.Abstractions
List<TableRouteUnit> RouteTo(Type entityType,
ShardingTableRouteConfig shardingTableRouteConfig);
/// <summary>
/// 直接路由采用默认数据源
/// </summary>
/// <param name="entityType"></param>
/// <param name="shardingTableRouteConfig"></param>
/// <param name="dataSourceName"></param>
/// <returns></returns>
List<TableRouteUnit> RouteTo(Type entityType, string dataSourceName,
ShardingTableRouteConfig shardingTableRouteConfig);
/// <summary>
/// 根据数据源路由进行分片路由
/// </summary>
/// <param name="entityType"></param>

View File

@ -18,11 +18,13 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes
public class TableRouteManager : ITableRouteManager
{
private readonly IVirtualDataSource _virtualDataSource;
private readonly IDataSourceRouteManager _dataSourceRouteManager;
private readonly ConcurrentDictionary<Type, IVirtualTableRoute> _tableRoutes = new();
public TableRouteManager(IVirtualDataSource virtualDataSource)
public TableRouteManager(IVirtualDataSource virtualDataSource,IDataSourceRouteManager dataSourceRouteManager)
{
_virtualDataSource = virtualDataSource;
_dataSourceRouteManager = dataSourceRouteManager;
}
public bool HasRoute(Type entityType)
{
@ -53,7 +55,11 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes
public List<TableRouteUnit> RouteTo(Type entityType, ShardingTableRouteConfig shardingTableRouteConfig)
{
return RouteTo(entityType, _virtualDataSource.DefaultDataSourceName, shardingTableRouteConfig);
}
public List<TableRouteUnit> RouteTo(Type entityType, string dataSourceName, ShardingTableRouteConfig shardingTableRouteConfig)
{
var dataSourceRouteResult = new DataSourceRouteResult(_virtualDataSource.DefaultDataSourceName);
return RouteTo(entityType, dataSourceRouteResult, shardingTableRouteConfig);
}

View File

@ -11,6 +11,7 @@ using System.Collections.Generic;
using System.Threading;
using ShardingCore.Core.DbContextCreator;
using ShardingCore.Core.ServiceProviders;
using ShardingCore.Core.ShardingConfigurations;
using ShardingCore.Core.VirtualRoutes.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Exceptions;
@ -23,12 +24,11 @@ namespace ShardingCore.DynamicDataSources
{
public class DataSourceInitializer : IDataSourceInitializer
{
private static readonly ILogger<DataSourceInitializer> _logger =
ShardingLoggerFactory.CreateLogger<DataSourceInitializer>();
private readonly ILogger<DataSourceInitializer> _logger ;
private readonly IShardingProvider _shardingProvider;
private readonly IDbContextCreator _dbContextCreator;
private readonly IShardingRouteConfigOptions _routeConfigOptions;
private readonly ShardingConfigOptions _shardingConfigOptions;
private readonly IVirtualDataSource _virtualDataSource;
private readonly IRouteTailFactory _routeTailFactory;
private readonly IDataSourceRouteManager _dataSourceRouteManager;
@ -40,18 +40,19 @@ namespace ShardingCore.DynamicDataSources
public DataSourceInitializer(
IShardingProvider shardingProvider,
IDbContextCreator dbContextCreator,
IShardingRouteConfigOptions routeConfigOptions,
ShardingConfigOptions shardingConfigOptions,
IVirtualDataSource virtualDataSource,
IRouteTailFactory routeTailFactory,
IDataSourceRouteManager dataSourceRouteManager,
ITableRouteManager tableRouteManager,
IEntityMetadataManager entityMetadataManager,
IShardingTableCreator shardingTableCreator,
ITableEnsureManager tableEnsureManager)
ITableEnsureManager tableEnsureManager,
ILogger<DataSourceInitializer> logger )
{
_shardingProvider = shardingProvider;
_dbContextCreator = dbContextCreator;
_routeConfigOptions = routeConfigOptions;
_shardingConfigOptions = shardingConfigOptions;
_virtualDataSource = virtualDataSource;
_routeTailFactory = routeTailFactory;
_dataSourceRouteManager = dataSourceRouteManager;
@ -59,6 +60,7 @@ namespace ShardingCore.DynamicDataSources
_entityMetadataManager = entityMetadataManager;
_tableCreator = shardingTableCreator;
_tableEnsureManager = tableEnsureManager;
_logger = logger;
}
public void InitConfigure(string dataSourceName,bool createDatabase,bool createTable)
@ -150,7 +152,7 @@ namespace ShardingCore.DynamicDataSources
}
catch (Exception e)
{
if (!_routeConfigOptions.IgnoreCreateTableError.GetValueOrDefault())
if (!_shardingConfigOptions.IgnoreCreateTableError.GetValueOrDefault())
{
_logger.LogWarning(e,
$"table :{physicTableName} will created.");

View File

@ -413,7 +413,7 @@ namespace ShardingCore.EFCores
if (primaryKeyValue != null)
{
var dataSourceName = GetDataSourceName(primaryKeyValue);
var tableTail = TableRouteManager.GetTableTail<TEntity>(primaryKeyValue);
var tableTail = TableRouteManager.GetTableTail<TEntity>(dataSourceName,primaryKeyValue);
var routeTail = _shardingRuntimeContext.GetRouteTailFactory().Create(tableTail);
;
return _context.GetShareDbContext(dataSourceName, routeTail);

View File

@ -27,8 +27,6 @@ namespace ShardingCore.EFCores
*/
public class ShardingModelCustomizer : ModelCustomizer
{
private static readonly ILogger<ShardingModelCustomizer> _logger =
ShardingLoggerFactory.CreateLogger<ShardingModelCustomizer>();
public ShardingModelCustomizer(ModelCustomizerDependencies dependencies) : base(dependencies)
{
@ -83,7 +81,6 @@ namespace ShardingCore.EFCores
var tableName = entityMetadata.LogicTableName;
if (string.IsNullOrWhiteSpace(tableName))
throw new ArgumentNullException($"{shardingEntity}: not found logic table name。");
_logger.LogDebug($"mapping table :[tableName]-->[{tableName}{tableSeparator}{tail}]");
entity.ToTable($"{tableName}{tableSeparator}{tail}");
}
}

View File

@ -0,0 +1,50 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.RuntimeContexts;
using ShardingCore.DynamicDataSources;
using ShardingCore.Exceptions;
namespace ShardingCore.Extensions
{
public static class ShardingRuntimeExtension
{
/// <summary>
/// 自动尝试补偿表
/// </summary>
/// <param name="shardingRuntimeContext"></param>
/// <param name="parallelCount"></param>
public static void UseAutoTryCompensateTable(this IShardingRuntimeContext shardingRuntimeContext, int? parallelCount = null)
{
shardingRuntimeContext.CheckRequirement();
var virtualDataSource = shardingRuntimeContext.GetVirtualDataSource();
var dataSourceInitializer = shardingRuntimeContext.GetDataSourceInitializer();
var shardingConfigOptions = shardingRuntimeContext.GetShardingConfigOptions();
var compensateTableParallelCount = parallelCount ?? shardingConfigOptions.CompensateTableParallelCount;
if (compensateTableParallelCount <= 0)
{
throw new ShardingCoreInvalidOperationException($"compensate table parallel count must >0");
}
var allDataSourceNames = virtualDataSource.GetAllDataSourceNames();
var partitionMigrationUnits = allDataSourceNames.Partition(compensateTableParallelCount);
foreach (var migrationUnits in partitionMigrationUnits)
{
var migrateUnits = migrationUnits.Select(o => new InitConfigureUnit(o)).ToList();
ExecuteInitConfigureUnit(dataSourceInitializer, migrateUnits);
}
}
private static void ExecuteInitConfigureUnit(IDataSourceInitializer dataSourceInitializer,
List<InitConfigureUnit> initConfigureUnits)
{
var initConfigureTasks = initConfigureUnits.Select(o =>
{
return Task.Run(() => { dataSourceInitializer.InitConfigure(o.DataSourceName, true, true); });
}).ToArray();
Task.WaitAll(initConfigureTasks);
}
}
}

View File

@ -67,13 +67,13 @@ namespace ShardingCore.Extensions
public static string GetTableTail<TEntity>(this ITableRouteManager tableRouteManager,
public static string GetTableTail<TEntity>(this ITableRouteManager tableRouteManager,string dataSourceName,
TEntity entity) where TEntity : class
{
var shardingRouteUnit = tableRouteManager.RouteTo(entity.GetType(),new ShardingTableRouteConfig(shardingTable: entity))[0];
return shardingRouteUnit.Tail;
}
public static string GetTableTail<TEntity>(this ITableRouteManager tableRouteManager,
public static string GetTableTail<TEntity>(this ITableRouteManager tableRouteManager,string dataSourceName,
object shardingKeyValue) where TEntity : class
{
var shardingRouteUnit = tableRouteManager.RouteTo(typeof(TEntity),new ShardingTableRouteConfig(shardingKeyValue: shardingKeyValue))[0];

View File

@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace ShardingCore.Jobs.Abstaractions
{

View File

@ -19,8 +19,7 @@ namespace ShardingCore.Jobs
[ExcludeFromCodeCoverage]
internal class JobRunnerService
{
private static readonly ILogger<JobRunnerService> _logger =
ShardingLoggerFactory.CreateLogger<JobRunnerService>();
private readonly ILogger<JobRunnerService> _logger;
private readonly IJobManager _jobManager;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private const long DEFAULT_MILLIS = 1000L;
@ -30,9 +29,10 @@ namespace ShardingCore.Jobs
/// </summary>
private const long MAX_DELAY_MILLIS = 30000L;
public JobRunnerService(IJobManager jobManager)
public JobRunnerService(IJobManager jobManager,ILogger<JobRunnerService> logger)
{
_jobManager = jobManager;
_logger = logger;
}
public async Task StartAsync()

View File

@ -32,11 +32,10 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
*/
public class DataSourceDbContext : IDataSourceDbContext
{
private static readonly ILogger<DataSourceDbContext> _logger =
ShardingLoggerFactory.CreateLogger<DataSourceDbContext>();
private static readonly IComparer<string> _comparer = new NoShardingFirstComparer();
private readonly ILogger<DataSourceDbContext> _logger;
public Type DbContextType { get; }
/// <summary>
/// 当前是否是默认的dbcontext 也就是不分片的dbcontext
@ -118,7 +117,8 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
bool isDefault,
DbContext shardingShellDbContext,
IDbContextCreator dbContextCreator,
ActualConnectionStringManager actualConnectionStringManager)
ActualConnectionStringManager actualConnectionStringManager,
ILogger<DataSourceDbContext> logger)
{
var shardingDbContext = (IShardingDbContext)shardingShellDbContext;
DataSourceName = dataSourceName;
@ -130,6 +130,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
.GetVirtualDataSource();
_dbContextCreator = dbContextCreator;
_actualConnectionStringManager = actualConnectionStringManager;
this._logger = logger;
}
/// <summary>

View File

@ -6,6 +6,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using ShardingCore.Core;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
@ -31,6 +32,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
/// <typeparam name="TShardingDbContext"></typeparam>
public class ShardingDbContextExecutor : IShardingDbContextExecutor
{
private readonly ILoggerFactory _loggerFactory;
private readonly DbContext _shardingDbContext;
//private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, DbContext>> _dbContextCaches = new ConcurrentDictionary<string, ConcurrentDictionary<string, DbContext>>();
@ -71,6 +73,8 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
_entityMetadataManager = _shardingRuntimeContext.GetEntityMetadataManager();
_routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
var shardingReadWriteManager = _shardingRuntimeContext.GetShardingReadWriteManager();
var shardingProvider = _shardingRuntimeContext.GetShardingProvider();
_loggerFactory=shardingProvider.GetService<ILoggerFactory>();
_actualConnectionStringManager = new ActualConnectionStringManager(shardingReadWriteManager,_virtualDataSource);
}
@ -78,7 +82,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
private IDataSourceDbContext GetDataSourceDbContext(string dataSourceName)
{
return _dbContextCaches.GetOrAdd(dataSourceName, dsname => new DataSourceDbContext(dsname, _virtualDataSource.IsDefault(dsname), _shardingDbContext, _dbContextCreator, _actualConnectionStringManager));
return _dbContextCaches.GetOrAdd(dataSourceName, dsname => new DataSourceDbContext(dsname, _virtualDataSource.IsDefault(dsname), _shardingDbContext, _dbContextCreator, _actualConnectionStringManager,_loggerFactory.CreateLogger<DataSourceDbContext>()));
}
/// <summary>
@ -116,7 +120,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
public DbContext CreateGenericDbContext<TEntity>(TEntity entity) where TEntity : class
{
var dataSourceName = GetDataSourceName(entity);
var tail = GetTableTail(entity);
var tail = GetTableTail(dataSourceName,entity);
return CreateDbContext(CreateDbContextStrategyEnum.ShareConnection, dataSourceName, _routeTailFactory.Create(tail));
}
@ -131,11 +135,11 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
return _dataSourceRouteManager.GetDataSourceName(entity);
}
private string GetTableTail<TEntity>(TEntity entity) where TEntity : class
private string GetTableTail<TEntity>(string dataSourceName,TEntity entity) where TEntity : class
{
if (!_entityMetadataManager.IsShardingTable(entity.GetType()))
return string.Empty;
return _tableRouteManager.GetTableTail(entity);
return _tableRouteManager.GetTableTail(dataSourceName,entity);
}
#endregion

View File

@ -19,7 +19,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
/// </summary>
public class DefaultShardingCompilerExecutor: IShardingCompilerExecutor
{
private static readonly ILogger<DefaultShardingCompilerExecutor> _logger=ShardingLoggerFactory.CreateLogger<DefaultShardingCompilerExecutor>();
private readonly ILogger<DefaultShardingCompilerExecutor> _logger;
private readonly IShardingTrackQueryExecutor _shardingTrackQueryExecutor;
private readonly IQueryCompilerContextFactory _queryCompilerContextFactory;
private readonly IPrepareParser _prepareParser;
@ -27,12 +27,13 @@ namespace ShardingCore.Sharding.ShardingExecutors
public DefaultShardingCompilerExecutor(
IShardingTrackQueryExecutor shardingTrackQueryExecutor, IQueryCompilerContextFactory queryCompilerContextFactory,IPrepareParser prepareParser,
IShardingRouteManager shardingRouteManager)
IShardingRouteManager shardingRouteManager,ILogger<DefaultShardingCompilerExecutor> logger)
{
_shardingTrackQueryExecutor = shardingTrackQueryExecutor;
_queryCompilerContextFactory = queryCompilerContextFactory;
_prepareParser = prepareParser;
_shardingRouteManager = shardingRouteManager;
_logger = logger;
}
public TResult Execute<TResult>(IShardingDbContext shardingDbContext, Expression query)
{

View File

@ -31,7 +31,6 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
*/
public class DefaultShardingQueryExecutor : IShardingQueryExecutor
{
private static readonly ILogger<DefaultShardingQueryExecutor> _logger=ShardingLoggerFactory.CreateLogger<DefaultShardingQueryExecutor>();
private readonly IStreamMergeContextFactory _streamMergeContextFactory;

View File

@ -21,8 +21,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
private readonly IDataSourceRouteRuleEngineFactory _dataSourceRouteRuleEngineFactory;
private readonly ITableRouteRuleEngineFactory _tableRouteRuleEngineFactory;
private static readonly ILogger<QueryCompilerContextFactory> _logger =
ShardingLoggerFactory.CreateLogger<QueryCompilerContextFactory>();
private readonly ILogger<QueryCompilerContextFactory> _logger;
private static readonly IQueryableCombine _enumerableQueryableCombine;
private static readonly IQueryableCombine _allQueryableCombine;
private static readonly IQueryableCombine _constantQueryableCombine;
@ -38,10 +37,11 @@ namespace ShardingCore.Sharding.ShardingExecutors
_whereQueryableCombine = new WhereQueryableCombine();
}
public QueryCompilerContextFactory(IDataSourceRouteRuleEngineFactory dataSourceRouteRuleEngineFactory,ITableRouteRuleEngineFactory tableRouteRuleEngineFactory)
public QueryCompilerContextFactory(IDataSourceRouteRuleEngineFactory dataSourceRouteRuleEngineFactory,ITableRouteRuleEngineFactory tableRouteRuleEngineFactory,ILogger<QueryCompilerContextFactory> logger)
{
_dataSourceRouteRuleEngineFactory = dataSourceRouteRuleEngineFactory;
_tableRouteRuleEngineFactory = tableRouteRuleEngineFactory;
_logger = logger;
}
public IQueryCompilerContext Create(IPrepareParseResult prepareParseResult)

View File

@ -20,6 +20,7 @@ using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Core.RuntimeContexts;
using ShardingCore.Core.ShardingConfigurations;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
@ -72,7 +73,7 @@ namespace ShardingCore.Sharding
public bool IsCrossTable => MergeQueryCompilerContext.IsCrossTable();
private readonly ITrackerManager _trackerManager;
private readonly IShardingRouteConfigOptions _shardingRouteConfigOptions;
private readonly ShardingConfigOptions _shardingConfigOptions;
private readonly ConcurrentDictionary<DbContext, object> _parallelDbContexts;
@ -85,21 +86,18 @@ namespace ShardingCore.Sharding
public StreamMergeContext(IMergeQueryCompilerContext mergeQueryCompilerContext, IParseResult parseResult,
IRewriteResult rewriteResult, IOptimizeResult optimizeResult,
IRouteTailFactory routeTailFactory, ITrackerManager trackerManager,
IShardingRouteConfigOptions shardingRouteConfigOptions)
IRewriteResult rewriteResult, IOptimizeResult optimizeResult)
{
MergeQueryCompilerContext = mergeQueryCompilerContext;
ShardingRuntimeContext = ((DbContext)mergeQueryCompilerContext.GetShardingDbContext())
.GetRequireService<IShardingRuntimeContext>();
ParseResult = parseResult;
RewriteQueryable = rewriteResult.GetRewriteQueryable();
OptimizeResult = optimizeResult;
_rewriteResult = rewriteResult;
_routeTailFactory = routeTailFactory;
ShardingRuntimeContext = mergeQueryCompilerContext.GetShardingDbContext().GetShardingRuntimeContext();
_routeTailFactory = ShardingRuntimeContext.GetRouteTailFactory();
_trackerManager = ShardingRuntimeContext.GetTrackerManager();
_shardingConfigOptions = ShardingRuntimeContext.GetShardingConfigOptions();
QueryEntities = MergeQueryCompilerContext.GetQueryEntities().Keys.ToHashSet();
_trackerManager = trackerManager;
_shardingRouteConfigOptions = shardingRouteConfigOptions;
_parallelDbContexts = new ConcurrentDictionary<DbContext, object>();
Orders = parseResult.GetOrderByContext().PropertyOrders.ToArray();
Skip = parseResult.GetPaginationContext().Skip;
@ -349,7 +347,7 @@ namespace ShardingCore.Sharding
private bool ThrowIfQueryRouteNotMatch()
{
return _shardingRouteConfigOptions.ThrowIfQueryRouteNotMatch;
return _shardingConfigOptions.ThrowIfQueryRouteNotMatch;
}
public bool UseUnionAllMerge()

View File

@ -22,23 +22,17 @@ namespace ShardingCore.Sharding
*/
public class StreamMergeContextFactory : IStreamMergeContextFactory
{
private readonly IRouteTailFactory _routeTailFactory;
private readonly IQueryableParseEngine _queryableParseEngine;
private readonly IQueryableRewriteEngine _queryableRewriteEngine;
private readonly IQueryableOptimizeEngine _queryableOptimizeEngine;
private readonly ITrackerManager _trackerManager;
private readonly IShardingRouteConfigOptions _shardingRouteConfigOptions;
public StreamMergeContextFactory(IRouteTailFactory routeTailFactory
, IQueryableParseEngine queryableParseEngine, IQueryableRewriteEngine queryableRewriteEngine, IQueryableOptimizeEngine queryableOptimizeEngine,
ITrackerManager trackerManager,IShardingRouteConfigOptions shardingRouteConfigOptions)
public StreamMergeContextFactory(IQueryableParseEngine queryableParseEngine,
IQueryableRewriteEngine queryableRewriteEngine,
IQueryableOptimizeEngine queryableOptimizeEngine)
{
_routeTailFactory = routeTailFactory;
_queryableParseEngine = queryableParseEngine;
_queryableRewriteEngine = queryableRewriteEngine;
_queryableOptimizeEngine = queryableOptimizeEngine;
_trackerManager = trackerManager;
_shardingRouteConfigOptions = shardingRouteConfigOptions;
}
public StreamMergeContext Create(IMergeQueryCompilerContext mergeQueryCompilerContext)
{
@ -47,7 +41,7 @@ namespace ShardingCore.Sharding
var rewriteResult = _queryableRewriteEngine.GetRewriteQueryable(mergeQueryCompilerContext, parseResult);
var optimizeResult = _queryableOptimizeEngine.Optimize(mergeQueryCompilerContext, parseResult, rewriteResult);
CheckMergeContext(mergeQueryCompilerContext, parseResult, rewriteResult, optimizeResult);
return new StreamMergeContext(mergeQueryCompilerContext, parseResult, rewriteResult,optimizeResult, _routeTailFactory,_trackerManager,_shardingRouteConfigOptions);
return new StreamMergeContext(mergeQueryCompilerContext, parseResult, rewriteResult,optimizeResult);
}
private void CheckMergeContext(IMergeQueryCompilerContext mergeQueryCompilerContext,IParseResult parseResult,IRewriteResult rewriteResult,IOptimizeResult optimizeResult)

View File

@ -252,32 +252,7 @@ namespace ShardingCore
public static void UseAutoTryCompensateTable(this IServiceProvider serviceProvider, int? parallelCount = null)
{
var shardingRuntimeContext = serviceProvider.GetRequiredService<IShardingRuntimeContext>();
shardingRuntimeContext.CheckRequirement();
var virtualDataSource = shardingRuntimeContext.GetVirtualDataSource();
var dataSourceInitializer = shardingRuntimeContext.GetDataSourceInitializer();
var shardingConfigOptions = shardingRuntimeContext.GetShardingConfigOptions();
var compensateTableParallelCount = parallelCount ?? shardingConfigOptions.CompensateTableParallelCount;
if (compensateTableParallelCount <= 0)
{
throw new ShardingCoreInvalidOperationException($"compensate table parallel count must >0");
}
var allDataSourceNames = virtualDataSource.GetAllDataSourceNames();
var partitionMigrationUnits = allDataSourceNames.Partition(compensateTableParallelCount);
foreach (var migrationUnits in partitionMigrationUnits)
{
var migrateUnits = migrationUnits.Select(o => new InitConfigureUnit(o)).ToList();
ExecuteInitConfigureUnit(dataSourceInitializer, migrateUnits);
}
}
private static void ExecuteInitConfigureUnit(IDataSourceInitializer dataSourceInitializer,
List<InitConfigureUnit> initConfigureUnits)
{
var initConfigureTasks = initConfigureUnits.Select(o =>
{
return Task.Run(() => { dataSourceInitializer.InitConfigure(o.DataSourceName, true, true); });
}).ToArray();
Task.WaitAll(initConfigureTasks);
shardingRuntimeContext.UseAutoTryCompensateTable(parallelCount);
}

View File

@ -73,8 +73,6 @@ namespace ShardingCore
public IShardingRuntimeContext Build(IServiceProvider appServiceProvider, ILoggerFactory loggerFactory)
{
var shardingRuntimeContext = new ShardingRuntimeContext();
shardingRuntimeContext.UseApplicationServiceProvider(appServiceProvider);
shardingRuntimeContext.UseLogfactory(loggerFactory);
shardingRuntimeContext.AddServiceConfig(services =>
{
// services.AddSingleton<IDbContextTypeCollector>(sp => new DbContextTypeCollector<TShardingDbContext>());
@ -94,6 +92,12 @@ namespace ShardingCore
shardingConfigOptions.CheckArguments();
return shardingConfigOptions;
});
services.AddLogging();
if (loggerFactory != null)
{
services.Replace(ServiceDescriptor.Singleton<ILoggerFactory>(sp => loggerFactory));
}
services.AddSingleton<IShardingProvider>(sp => new ShardingProvider(sp,appServiceProvider));
services.AddInternalShardingCore<TShardingDbContext>();
foreach (var serviceAction in _serviceActions)

View File

@ -12,6 +12,7 @@ using System;
using System.Threading;
using ShardingCore.Core.DbContextCreator;
using ShardingCore.Core.ServiceProviders;
using ShardingCore.Core.ShardingConfigurations;
using ShardingCore.Logger;
using ShardingCore.Sharding;
@ -25,20 +26,21 @@ namespace ShardingCore.TableCreator
*/
public class ShardingTableCreator : IShardingTableCreator
{
private static readonly ILogger<ShardingTableCreator> _logger =
ShardingLoggerFactory.CreateLogger<ShardingTableCreator>();
private readonly ILogger<ShardingTableCreator> _logger;
private readonly IShardingProvider _shardingProvider;
private readonly IShardingRouteConfigOptions _routeConfigOptions;
private readonly ShardingConfigOptions _shardingConfigOptions;
private readonly IRouteTailFactory _routeTailFactory;
private readonly IDbContextCreator _dbContextCreator;
public ShardingTableCreator(IShardingProvider shardingProvider,IShardingRouteConfigOptions routeConfigOptions, IRouteTailFactory routeTailFactory,IDbContextCreator dbContextCreator)
public ShardingTableCreator(IShardingProvider shardingProvider,ShardingConfigOptions shardingConfigOptions, IRouteTailFactory routeTailFactory,IDbContextCreator dbContextCreator,
ILogger<ShardingTableCreator> logger)
{
_shardingProvider = shardingProvider;
_routeConfigOptions = routeConfigOptions;
_shardingConfigOptions = shardingConfigOptions;
_routeTailFactory = routeTailFactory;
_dbContextCreator = dbContextCreator;
_logger = logger;
}
public void CreateTable<T>(string dataSourceName, string tail)
@ -72,7 +74,7 @@ namespace ShardingCore.TableCreator
}
catch (Exception ex)
{
if (!_routeConfigOptions.IgnoreCreateTableError.GetValueOrDefault())
if (!_shardingConfigOptions.IgnoreCreateTableError.GetValueOrDefault())
{
_logger.LogWarning(ex,
$"create table error entity name:[{shardingEntityType.Name}].");

View File

@ -29,8 +29,6 @@ namespace ShardingCore.VirtualRoutes.Abstractions
{
private static readonly object APPEND_LOCK = new object();
private static readonly ILogger<AbstractShardingAutoCreateOperatorVirtualTableRoute<TEntity, TKey>> _logger =
ShardingLoggerFactory.CreateLogger<AbstractShardingAutoCreateOperatorVirtualTableRoute<TEntity, TKey>>();
private readonly SafeReadAppendList<string> _tails = new SafeReadAppendList<string>();
@ -103,7 +101,9 @@ namespace ShardingCore.VirtualRoutes.Abstractions
public virtual Task ExecuteAsync()
{
_logger.LogDebug($"get {typeof(TEntity).Name}'s route execute job ");
var logger=RouteShardingProvider
.GetService<ILogger<AbstractShardingAutoCreateOperatorVirtualTableRoute<TEntity, TKey>>>();
logger.LogDebug($"get {typeof(TEntity).Name}'s route execute job ");
var entityMetadataManager = RouteShardingProvider.GetRequiredService<IEntityMetadataManager>();
var tableCreator = RouteShardingProvider.GetRequiredService<IShardingTableCreator>();
@ -127,22 +127,22 @@ namespace ShardingCore.VirtualRoutes.Abstractions
dataSources.Add(virtualDataSource.DefaultDataSourceName);
}
_logger.LogInformation($"auto create table data source names:[{string.Join(",", dataSources)}]");
logger.LogInformation($"auto create table data source names:[{string.Join(",", dataSources)}]");
foreach (var dataSource in dataSources)
{
try
{
_logger.LogInformation($"begin table tail:[{tail}],entity:[{typeof(TEntity).Name}]");
logger.LogInformation($"begin table tail:[{tail}],entity:[{typeof(TEntity).Name}]");
tableCreator.CreateTable(dataSource, typeof(TEntity), tail);
_logger.LogInformation($"succeed table tail:[{tail}],entity:[{typeof(TEntity).Name}]");
logger.LogInformation($"succeed table tail:[{tail}],entity:[{typeof(TEntity).Name}]");
}
catch (Exception e)
{
//ignore
_logger.LogInformation($"warning table tail:[{tail}],entity:[{typeof(TEntity).Name}]");
logger.LogInformation($"warning table tail:[{tail}],entity:[{typeof(TEntity).Name}]");
if (DoLogError)
_logger.LogError(e, $"{dataSource} {typeof(TEntity).Name}'s create table error ");
logger.LogError(e, $"{dataSource} {typeof(TEntity).Name}'s create table error ");
}
}

View File

@ -7,6 +7,7 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using ShardingCore.Bootstrappers;
using ShardingCore.Core;
using ShardingCore.Helpers;
using ShardingCore.Sharding.ReadWriteConfigurations;
using ShardingCore.TableExists;
@ -38,9 +39,6 @@ namespace ShardingCore.Test
services.AddShardingDbContext<ShardingDefaultDbContext>()
.UseRouteConfig(op =>
{
//当无法获取路由时会返回默认值而不是报错
op.ThrowIfQueryRouteNotMatch = false;
op.AddShardingDataSourceRoute<OrderAreaShardingVirtualDataSourceRoute>();
op.AddShardingTableRoute<SysUserModVirtualTableRoute>();
op.AddShardingTableRoute<SysUserSalaryVirtualTableRoute>();
@ -58,17 +56,32 @@ namespace ShardingCore.Test
})
.UseConfig(op =>
{
//当无法获取路由时会返回默认值而不是报错
op.ThrowIfQueryRouteNotMatch = false;
//忽略建表错误compensate table和table creator
op.IgnoreCreateTableError = true;
//迁移时使用的并行线程数(分库有效)defaultShardingDbContext.Database.Migrate()
op.MigrationParallelCount = Environment.ProcessorCount;
//补偿表创建并行线程数 调用UseAutoTryCompensateTable有效
op.CompensateTableParallelCount = Environment.ProcessorCount;
//最大连接数限制
op.MaxQueryConnectionsLimit = Environment.ProcessorCount;
//链接模式系统默认
op.ConnectionMode = ConnectionModeEnum.SYSTEM_AUTO;
//如何通过字符串查询创建DbContext
op.UseShardingQuery((conStr, builder) =>
{
builder.UseSqlServer(conStr).UseLoggerFactory(efLogger);
});
//如何通过事务创建DbContext
op.UseShardingTransaction((connection, builder) =>
{
builder.UseSqlServer(connection).UseLoggerFactory(efLogger);
});
//添加默认数据源
op.AddDefaultDataSource("A",
"Data Source=localhost;Initial Catalog=ShardingCoreDBA;Integrated Security=True;");
//添加额外数据源
op.AddExtraDataSource(sp =>
{
return new Dictionary<string, string>()
@ -77,6 +90,7 @@ namespace ShardingCore.Test
{ "C", "Data Source=localhost;Initial Catalog=ShardingCoreDBC;Integrated Security=True;" },
};
});
//添加读写分离
op.AddReadWriteSeparation(sp =>
{
return new Dictionary<string, IEnumerable<string>>()
@ -89,13 +103,17 @@ namespace ShardingCore.Test
}
};
}, ReadStrategyEnum.Loop, defaultEnable: false, readConnStringGetStrategy: ReadConnStringGetStrategyEnum.LatestEveryTime);
}).ReplaceService<ITableEnsureManager,SqlServerTableEnsureManager>(ServiceLifetime.Singleton).AddShardingCore();
})
.ReplaceService<ITableEnsureManager,SqlServerTableEnsureManager>()
.AddShardingCore();
}
// 可以添加要用到的方法参数,会自动从注册的服务中获取服务实例,类似于 asp.net core 里 Configure 方法
public void Configure(IServiceProvider serviceProvider)
{
//启动ShardingCore创建表任务
serviceProvider.UseAutoShardingCreate();
//启动进行表补偿
serviceProvider.UseAutoTryCompensateTable();
// 有一些测试数据要初始化可以放在这里
InitData(serviceProvider).GetAwaiter().GetResult();

View File

@ -38,8 +38,6 @@ namespace ShardingCore.Test2x
services.AddShardingDbContext<ShardingDefaultDbContext>()
.UseRouteConfig(op =>
{
//当无法获取路由时会返回默认值而不是报错
op.ThrowIfQueryRouteNotMatch = false;
op.AddShardingDataSourceRoute<OrderAreaShardingVirtualDataSourceRoute>();
op.AddShardingTableRoute<SysUserModVirtualTableRoute>();
op.AddShardingTableRoute<SysUserSalaryVirtualTableRoute>();
@ -58,6 +56,8 @@ namespace ShardingCore.Test2x
.UseConfig(op =>
{
//当无法获取路由时会返回默认值而不是报错
op.ThrowIfQueryRouteNotMatch = false;
op.UseShardingQuery((conStr, builder) =>
{
builder.UseSqlServer(conStr).UseLoggerFactory(efLogger);