针对dbcontext的获取创建提供了多种模式,包括共享链接,独立写链接,独立读链接,并且针对Migration有并行的方式处理

This commit is contained in:
xuejiaming 2022-07-05 09:02:21 +08:00
parent bcb0d94962
commit 2bf457ceaa
19 changed files with 218 additions and 265 deletions

View File

@ -7,6 +7,7 @@ using ShardingCore.Core.QueryRouteManagers.Abstractions;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.Core.ServiceProviders;
using ShardingCore.Core.ShardingConfigurations;
using ShardingCore.Core.ShardingMigrations.Abstractions;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Core.UnionAllMergeShardingProviders.Abstractions;
@ -27,6 +28,7 @@ namespace ShardingCore.Core.RuntimeContexts
{
IShardingProvider GetShardingProvider();
ShardingConfigOptions GetShardingConfigOptions();
IShardingMigrationManager GetShardingMigrationManager();
IShardingComparer GetShardingComparer();
IShardingCompilerExecutor GetShardingCompilerExecutor();
IShardingReadWriteManager GetShardingReadWriteManager();

View File

@ -9,6 +9,7 @@ using ShardingCore.Core.QueryRouteManagers.Abstractions;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.Core.ServiceProviders;
using ShardingCore.Core.ShardingConfigurations;
using ShardingCore.Core.ShardingMigrations.Abstractions;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Core.UnionAllMergeShardingProviders.Abstractions;
@ -79,6 +80,13 @@ namespace ShardingCore.Core.RuntimeContexts
}
private IShardingMigrationManager _shardingMigrationManager;
public IShardingMigrationManager GetShardingMigrationManager()
{
return _shardingMigrationManager??= GetRequiredService<IShardingMigrationManager>();
}
private IShardingComparer _shardingComparer;
public IShardingComparer GetShardingComparer()
{
@ -304,6 +312,7 @@ namespace ShardingCore.Core.RuntimeContexts
{
GetShardingProvider();
GetShardingConfigOptions();
GetShardingMigrationManager();
GetShardingComparer();
GetShardingCompilerExecutor();
GetShardingReadWriteManager();

View File

@ -9,6 +9,14 @@ namespace ShardingCore.Core.ShardingConfigurations
{
public class ShardingConfigOptions
{
/// <summary>
/// 配置全局迁移最大并行数,以data source为一个单元并行迁移保证在多数据库分库情况下可以大大提高性能
/// </summary>
public int MigrationParallelCount { get; set; }= Environment.ProcessorCount;
/// <summary>
/// 启动补偿表的最大并行数,以data source为一个单元并行迁移保证在多数据库分库情况下可以大大提高性能
/// </summary>
public int CompensateTableParallelCount { get; set; }= Environment.ProcessorCount;
/// <summary>
/// 全局配置最大的查询连接数限制,默认系统逻辑处理器<code>Environment.ProcessorCount</code>
/// </summary>

View File

@ -115,7 +115,7 @@ namespace ShardingCore.DynamicDataSources
if (context is IShardingDbContext shardingDbContext)
{
using (var dbContext =
shardingDbContext.GetDbContext(dataSourceName, true,
shardingDbContext.GetIndependentWriteDbContext(dataSourceName,
_routeTailFactory.Create(string.Empty, false)))
{
if (isDefault)

View File

@ -416,7 +416,7 @@ namespace ShardingCore.EFCores
var tableTail = TableRouteManager.GetTableTail<TEntity>(primaryKeyValue);
var routeTail = _shardingRuntimeContext.GetRouteTailFactory().Create(tableTail);
;
return _context.GetDbContext(dataSourceName, false, routeTail);
return _context.GetShareDbContext(dataSourceName, routeTail);
}
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -10,10 +11,13 @@ using Microsoft.EntityFrameworkCore.Migrations.Internal;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Core.DbContextCreator;
using ShardingCore.Core.RuntimeContexts;
using ShardingCore.Core.ServiceProviders;
using ShardingCore.Core.ShardingConfigurations;
using ShardingCore.Core.ShardingMigrations.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingDbContextExecutors;
@ -24,12 +28,22 @@ namespace ShardingCore.EFCores
private readonly IShardingRuntimeContext _shardingRuntimeContext;
private readonly IVirtualDataSource _virtualDataSource;
private readonly ShardingConfigOptions _shardingConfigOptions;
private readonly IShardingProvider _shardingProvider;
private readonly IDbContextCreator _dbContextCreator;
private readonly IRouteTailFactory _routeTailFactory;
private readonly IShardingMigrationManager _shardingMigrationManager;
public ShardingMigrator(IShardingRuntimeContext shardingRuntimeContext,IMigrationsAssembly migrationsAssembly, IHistoryRepository historyRepository, IDatabaseCreator databaseCreator, IMigrationsSqlGenerator migrationsSqlGenerator, IRawSqlCommandBuilder rawSqlCommandBuilder, IMigrationCommandExecutor migrationCommandExecutor, IRelationalConnection connection, ISqlGenerationHelper sqlGenerationHelper, ICurrentDbContext currentContext, IModelRuntimeInitializer modelRuntimeInitializer, IDiagnosticsLogger<DbLoggerCategory.Migrations> logger, IRelationalCommandDiagnosticsLogger commandLogger, IDatabaseProvider databaseProvider) : base(migrationsAssembly, historyRepository, databaseCreator, migrationsSqlGenerator, rawSqlCommandBuilder, migrationCommandExecutor, connection, sqlGenerationHelper, currentContext, modelRuntimeInitializer, logger, commandLogger, databaseProvider)
{
_shardingRuntimeContext = shardingRuntimeContext;
_virtualDataSource = _shardingRuntimeContext.GetVirtualDataSource();
_shardingConfigOptions = _shardingRuntimeContext.GetShardingConfigOptions();
_shardingProvider = _shardingRuntimeContext.GetShardingProvider();
_dbContextCreator = _shardingRuntimeContext.GetDbContextCreator();
_routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
_shardingMigrationManager = _shardingRuntimeContext.GetShardingMigrationManager();
}
public override void Migrate(string targetMigration = null)
@ -50,34 +64,62 @@ namespace ShardingCore.EFCores
}
public override async Task MigrateAsync(string targetMigration = null, CancellationToken cancellationToken = new CancellationToken())
{
var shardingProvider = _shardingRuntimeContext.GetShardingProvider();
var dbContextCreator = _shardingRuntimeContext.GetDbContextCreator();
var routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
var shardingMigrationManager = _shardingRuntimeContext.GetRequiredService<IShardingMigrationManager>();
var allDataSourceNames = _virtualDataSource.GetAllDataSourceNames();
using (var scope=shardingProvider.CreateScope())
using (var scope=_shardingProvider.CreateScope())
{
using (var shellDbContext = dbContextCreator.GetShellDbContext(scope.ServiceProvider))
using (var shellDbContext = _dbContextCreator.GetShellDbContext(scope.ServiceProvider))
{
foreach (var dataSourceName in allDataSourceNames)
var migrationParallelCount = _shardingConfigOptions.MigrationParallelCount;
var partitionMigrationUnits = allDataSourceNames.Partition(migrationParallelCount);
foreach (var migrationUnits in partitionMigrationUnits)
{
using (shardingMigrationManager.CreateScope())
{
shardingMigrationManager.Current.CurrentDataSourceName = dataSourceName;
var dbContextOptions = CreateDbContextOptions(shellDbContext.GetType(),dataSourceName);
using (var dbContext = dbContextCreator.CreateDbContext(shellDbContext,new ShardingDbContextOptions(dbContextOptions,routeTailFactory.Create(string.Empty, false))))
{
if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())
{
await dbContext.Database.MigrateAsync();
}
}
}
var migrateUnits = migrationUnits.Select(o =>new MigrateUnit(shellDbContext,o)).ToList();
await ExecuteMigrateUnitsAsync(migrateUnits);
}
}
}
}
private async Task ExecuteMigrateUnitsAsync(List<MigrateUnit> migrateUnits)
{
var migrateTasks = migrateUnits.Select(migrateUnit =>
{
return Task.Run(async () =>
{
using (_shardingMigrationManager.CreateScope())
{
_shardingMigrationManager.Current.CurrentDataSourceName = migrateUnit.DataSourceName;
var dbContextOptions = CreateDbContextOptions(migrateUnit.ShellDbContext.GetType(),
migrateUnit.DataSourceName);
using (var dbContext = _dbContextCreator.CreateDbContext(migrateUnit.ShellDbContext,
new ShardingDbContextOptions(dbContextOptions,
_routeTailFactory.Create(string.Empty, false))))
{
if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())
{
await dbContext.Database.MigrateAsync();
}
}
}
return 1;
});
}).ToArray();
await TaskHelper.WhenAllFastFail(migrateTasks);
}
}
public class MigrateUnit
{
public MigrateUnit(DbContext shellDbContext, string dataSourceName)
{
ShellDbContext = shellDbContext;
DataSourceName = dataSourceName;
}
public DbContext ShellDbContext { get; }
public string DataSourceName { get; }
}
}

View File

@ -10,7 +10,9 @@ using Microsoft.EntityFrameworkCore.Query;
using Microsoft.EntityFrameworkCore.Query.Internal;
using ShardingCore.Core;
using ShardingCore.Core.RuntimeContexts;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.EFCores.OptionsExtensions;
using ShardingCore.Sharding;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Extensions
@ -32,5 +34,41 @@ namespace ShardingCore.Extensions
{
return ((DbContext)shardingDbContext).GetRequireService<IShardingRuntimeContext>();
}
/// <summary>
/// 创建共享链接DbConnection
/// </summary>
/// <param name="shardingDbContext"></param>
/// <param name="dataSourceName"></param>
/// <param name="routeTail"></param>
/// <returns></returns>
public static DbContext GetShareDbContext(this IShardingDbContext shardingDbContext,string dataSourceName,IRouteTail routeTail)
{
return shardingDbContext.GetDbContext(dataSourceName, CreateDbContextStrategyEnum.ShareConnection, routeTail);
}
/// <summary>
/// 获取独立生命周期的写连接字符串的db context
/// </summary>
/// <param name="shardingDbContext"></param>
/// <param name="dataSourceName"></param>
/// <param name="routeTail"></param>
/// <returns></returns>
public static DbContext GetIndependentWriteDbContext(this IShardingDbContext shardingDbContext,string dataSourceName,IRouteTail routeTail)
{
return shardingDbContext.GetDbContext(dataSourceName, CreateDbContextStrategyEnum.IndependentConnectionWrite, routeTail);
}
/// <summary>
/// 获取独立生命周期的读连接字符串的db context
/// </summary>
/// <param name="shardingDbContext"></param>
/// <param name="dataSourceName"></param>
/// <param name="routeTail"></param>
/// <returns></returns>
public static DbContext GetIndependentQueryDbContext(this IShardingDbContext shardingDbContext,string dataSourceName,IRouteTail routeTail)
{
return shardingDbContext.GetDbContext(dataSourceName, CreateDbContextStrategyEnum.IndependentConnectionQuery, routeTail);
}
}
}

View File

@ -13,6 +13,7 @@ using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Exceptions;
using ShardingCore.Sharding;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Extensions
@ -186,7 +187,7 @@ namespace ShardingCore.Extensions
var routeTailIdentity = routeTail.GetRouteTailIdentity();
if (!dataSourceBulkDicEntries.TryGetValue(routeTailIdentity, out var bulkDicEntry))
{
var dbContext = shardingDbContext.GetDbContext(dataSourceName, false, routeTail);
var dbContext = shardingDbContext.GetShareDbContext(dataSourceName, routeTail);
bulkDicEntry = new BulkDicEntry<TEntity>(dbContext, new LinkedList<TEntity>());
dataSourceBulkDicEntries.Add(routeTailIdentity, bulkDicEntry);
}
@ -201,7 +202,7 @@ namespace ShardingCore.Extensions
var routeTailIdentity = routeTail.GetRouteTailIdentity();
if (!dataSourceBulkDicEntries.TryGetValue(routeTailIdentity, out var bulkDicEntry))
{
var dbContext = shardingDbContext.GetDbContext(dataSourceName, false, routeTail);
var dbContext = shardingDbContext.GetShareDbContext(dataSourceName, routeTail);
bulkDicEntry = new BulkDicEntry<TEntity>(dbContext, new LinkedList<TEntity>());
dataSourceBulkDicEntries.Add(routeTailIdentity, bulkDicEntry);
}
@ -268,7 +269,7 @@ namespace ShardingCore.Extensions
if (physicTables.IsEmpty())
throw new ShardingCoreException($"{where.ShardingPrint()} cant found any physic table");
var dbs = physicTables.Select(o => shardingDbContext.GetDbContext(dataSourceName, false, routeTailFactory.Create(o.Tail))).ToList();
var dbs = physicTables.Select(o => shardingDbContext.GetShareDbContext(dataSourceName, routeTailFactory.Create(o.Tail))).ToList();
foreach (var dbContext in dbs)
{
dbContexts.AddLast(dbContext);
@ -276,7 +277,7 @@ namespace ShardingCore.Extensions
}
else
{
var dbContext = shardingDbContext.GetDbContext(dataSourceName, false, routeTailFactory.Create(string.Empty));
var dbContext = shardingDbContext.GetShareDbContext(dataSourceName, routeTailFactory.Create(string.Empty));
dbContexts.AddLast(dbContext);
}

View File

@ -11,8 +11,6 @@ using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
namespace ShardingCore.Sharding
@ -66,9 +64,9 @@ namespace ShardingCore.Sharding
public DbContext GetDbContext(string dataSourceName, bool parallelQuery, IRouteTail routeTail)
public DbContext GetDbContext(string dataSourceName, CreateDbContextStrategyEnum strategy, IRouteTail routeTail)
{
return ShardingDbContextExecutor.CreateDbContext(parallelQuery, dataSourceName, routeTail);
return ShardingDbContextExecutor.CreateDbContext(strategy, dataSourceName, routeTail);
}
/// <summary>

View File

@ -17,10 +17,10 @@ namespace ShardingCore.Sharding.Abstractions
/// create DbContext
/// </summary>
/// <param name="dataSourceName">data source</param>
/// <param name="parallelQuery">true not care db context life, false need call dispose(),if false will use read connectionString</param>
/// <param name="strategy">生成db connection的策略,主要区别在于是否和主db connection一直或者是否需要缓存其connection还有是否是独立声明周期的区别</param>
/// <param name="routeTail"></param>
/// <returns></returns>
DbContext GetDbContext(string dataSourceName, bool parallelQuery, IRouteTail routeTail);
DbContext GetDbContext(string dataSourceName, CreateDbContextStrategyEnum strategy, IRouteTail routeTail);
/// <summary>
/// 创建通用的db context

View File

@ -40,11 +40,11 @@ namespace ShardingCore.Sharding.Abstractions
/// <summary>
/// create sharding db context options
/// </summary>
/// <param name="parallelQuery">this query has >1 connection query</param>
/// <param name="strategy">如果当前查询需要多链接的情况下那么将使用<code>IndependentConnectionQuery</code>否则使用<code>ShareConnection</code></param>
/// <param name="dataSourceName">data source name</param>
/// <param name="routeTail"></param>
/// <returns></returns>
DbContext CreateDbContext(bool parallelQuery, string dataSourceName, IRouteTail routeTail);
DbContext CreateDbContext(CreateDbContextStrategyEnum strategy, string dataSourceName, IRouteTail routeTail);
/// <summary>
/// create db context by entity
/// </summary>

View File

@ -0,0 +1,19 @@
namespace ShardingCore.Sharding
{
public enum CreateDbContextStrategyEnum
{
/// <summary>
/// 共享链接(只是用写链接字符串) 无需管理connection的生命周期
/// 简单说就是无需调用dispose
/// </summary>
ShareConnection,
/// <summary>
/// 并行查询链接(有可能会使用读写分离链接字符串) 独立生命周期需要手动dispose或者等系统调用
/// </summary>
IndependentConnectionQuery,
/// <summary>
/// 并行写链接(只是用写链接字符串) 独立生命周期需要手动dispose或者等系统调用
/// </summary>
IndependentConnectionWrite
}
}

View File

@ -87,37 +87,38 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
public bool IsMultiDbContext =>
_dbContextCaches.Count > 1 || _dbContextCaches.Sum(o => o.Value.DbContextCount) > 1;
public DbContext CreateDbContext(bool parallelQuery, string dataSourceName, IRouteTail routeTail)
public DbContext CreateDbContext(CreateDbContextStrategyEnum strategy, string dataSourceName, IRouteTail routeTail)
{
if (!parallelQuery)
if (CreateDbContextStrategyEnum.ShareConnection==strategy)
{
var dataSourceDbContext = GetDataSourceDbContext(dataSourceName);
return dataSourceDbContext.CreateDbContext(routeTail);
}
else
{
var parallelDbContextOptions = CreateParallelDbContextOptions(dataSourceName);
var parallelDbContextOptions = CreateParallelDbContextOptions(dataSourceName,strategy);
var dbContext = _dbContextCreator.CreateDbContext(_shardingDbContext, parallelDbContextOptions, routeTail);
dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
return dbContext;
}
}
private DbContextOptions CreateParallelDbContextOptions(string dataSourceName)
private DbContextOptions CreateParallelDbContextOptions(string dataSourceName,CreateDbContextStrategyEnum strategy)
{
var dbContextOptionBuilder = DataSourceDbContext.CreateDbContextOptionBuilder(_shardingDbContext.GetType());
var connectionString = _actualConnectionStringManager.GetConnectionString(dataSourceName, false);
var connectionString = _actualConnectionStringManager.GetConnectionString(dataSourceName, CreateDbContextStrategyEnum.IndependentConnectionWrite==strategy);
_virtualDataSource.UseDbContextOptionsBuilder(connectionString, dbContextOptionBuilder).UseShardingOptions(_shardingRuntimeContext);
return dbContextOptionBuilder.Options;
}
public DbContext CreateGenericDbContext<TEntity>(TEntity entity) where TEntity : class
{
var dataSourceName = GetDataSourceName(entity);
var tail = GetTableTail(entity);
return CreateDbContext(false, dataSourceName, _routeTailFactory.Create(tail));
return CreateDbContext(CreateDbContextStrategyEnum.ShareConnection, dataSourceName, _routeTailFactory.Create(tail));
}
public IVirtualDataSource GetVirtualDataSource()

View File

@ -170,7 +170,10 @@ namespace ShardingCore.Sharding.ShardingExecutors
//要么本次查询不追踪如果需要追踪不可以存在跨tails
var routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
var sqlRouteUnit = _shardingRouteResult.RouteUnits.First();
var dbContext = GetShardingDbContext().GetDbContext(sqlRouteUnit.DataSourceName, IsParallelQuery(), routeTailFactory.Create(sqlRouteUnit.TableRouteResult));
var strategy = !IsParallelQuery()
? CreateDbContextStrategyEnum.ShareConnection
: CreateDbContextStrategyEnum.IndependentConnectionQuery;
var dbContext = GetShardingDbContext().GetDbContext(sqlRouteUnit.DataSourceName,strategy , routeTailFactory.Create(sqlRouteUnit.TableRouteResult));
_queryCompilerExecutor = new QueryCompilerExecutor(dbContext, GetQueryExpression());
}
}

View File

@ -171,7 +171,11 @@ namespace ShardingCore.Sharding.ShardingExecutors
{
var virtualDataSource = _shardingDbContext.GetVirtualDataSource();
var routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
var dbContext = _shardingDbContext.GetDbContext(virtualDataSource.DefaultDataSourceName, IsParallelQuery(), routeTailFactory.Create(string.Empty));
var strategy = !IsParallelQuery()
? CreateDbContextStrategyEnum.ShareConnection
: CreateDbContextStrategyEnum.IndependentConnectionQuery;
var dbContext = _shardingDbContext.GetDbContext(virtualDataSource.DefaultDataSourceName, strategy, routeTailFactory.Create(string.Empty));
_queryCompilerExecutor = new QueryCompilerExecutor(dbContext, _queryExpression);
}
}

View File

@ -33,13 +33,13 @@ namespace ShardingCore.Sharding
* @Date: Monday, 25 January 2021 11:38:27
* @Email: 326308290@qq.com
*/
public class StreamMergeContext : IMergeParseContext, IDisposable, IPrint
public class StreamMergeContext : IMergeParseContext, IDisposable, IPrint
#if !EFCORE2
, IAsyncDisposable
#endif
{
public IMergeQueryCompilerContext MergeQueryCompilerContext { get; }
public IShardingRuntimeContext ShardingRuntimeContext{ get; }
public IShardingRuntimeContext ShardingRuntimeContext { get; }
public IParseResult ParseResult { get; }
public IQueryable RewriteQueryable { get; }
public IOptimizeResult OptimizeResult { get; }
@ -48,7 +48,7 @@ namespace ShardingCore.Sharding
private readonly IRouteTailFactory _routeTailFactory;
public int? Skip { get; private set; }
public int? Take { get; private set;}
public int? Take { get; private set; }
public PropertyOrder[] Orders { get; private set; }
public SelectContext SelectContext => ParseResult.GetSelectContext();
@ -59,7 +59,7 @@ namespace ShardingCore.Sharding
/// 本次查询涉及的对象
/// </summary>
public ISet<Type> QueryEntities { get; }
/// <summary>
/// 本次查询跨库
@ -84,9 +84,10 @@ namespace ShardingCore.Sharding
public bool TailComparerNeedReverse => OptimizeResult.SameWithTailComparer();
public StreamMergeContext(IMergeQueryCompilerContext mergeQueryCompilerContext,IParseResult parseResult,IRewriteResult rewriteResult,IOptimizeResult optimizeResult,
IRouteTailFactory routeTailFactory,ITrackerManager trackerManager,IShardingRouteConfigOptions shardingRouteConfigOptions)
public StreamMergeContext(IMergeQueryCompilerContext mergeQueryCompilerContext, IParseResult parseResult,
IRewriteResult rewriteResult, IOptimizeResult optimizeResult,
IRouteTailFactory routeTailFactory, ITrackerManager trackerManager,
IShardingRouteConfigOptions shardingRouteConfigOptions)
{
MergeQueryCompilerContext = mergeQueryCompilerContext;
ShardingRuntimeContext = ((DbContext)mergeQueryCompilerContext.GetShardingDbContext())
@ -96,8 +97,8 @@ namespace ShardingCore.Sharding
OptimizeResult = optimizeResult;
_rewriteResult = rewriteResult;
_routeTailFactory = routeTailFactory;
QueryEntities= MergeQueryCompilerContext.GetQueryEntities().Keys.ToHashSet();
_trackerManager =trackerManager;
QueryEntities = MergeQueryCompilerContext.GetQueryEntities().Keys.ToHashSet();
_trackerManager = trackerManager;
_shardingRouteConfigOptions = shardingRouteConfigOptions;
_parallelDbContexts = new ConcurrentDictionary<DbContext, object>();
Orders = parseResult.GetOrderByContext().PropertyOrders.ToArray();
@ -114,10 +115,12 @@ namespace ShardingCore.Sharding
{
Skip = skip;
}
public void ReSetTake(int? take)
{
Take = take;
}
/// <summary>
/// 创建对应的dbcontext
/// </summary>
@ -126,17 +129,21 @@ namespace ShardingCore.Sharding
/// <returns></returns>
public DbContext CreateDbContext(ISqlRouteUnit sqlRouteUnit, ConnectionModeEnum connectionMode)
{
var routeTail = _routeTailFactory.Create(sqlRouteUnit.TableRouteResult);
//如果开启了读写分离或者本次查询是跨表的表示本次查询的dbcontext是不存储的用完后就直接dispose
var parallelQuery = IsParallelQuery();
var dbContext = GetShardingDbContext().GetDbContext(sqlRouteUnit.DataSourceName, parallelQuery, routeTail);
var strategy = !parallelQuery
? CreateDbContextStrategyEnum.ShareConnection
: CreateDbContextStrategyEnum.IndependentConnectionQuery;
var dbContext = GetShardingDbContext().GetDbContext(sqlRouteUnit.DataSourceName, strategy, routeTail);
if (parallelQuery && RealConnectionMode(connectionMode) == ConnectionModeEnum.MEMORY_STRICTLY)
{
_parallelDbContexts.TryAdd(dbContext, null);
}
return dbContext;
}
/// <summary>
/// 因为并发查询情况下那么你是内存就是内存你是流式就是流式
/// 如果不是并发查询的情况下系统会将当前dbcontext进行利用起来所以只能是流式
@ -164,6 +171,7 @@ namespace ShardingCore.Sharding
{
return RewriteQueryable;
}
public IQueryable GetOriginalQueryable()
{
return MergeQueryCompilerContext.GetQueryCombineResult().GetCombineQueryable();
@ -202,8 +210,10 @@ namespace ShardingCore.Sharding
public bool IsSingleShardingEntityQuery()
{
return QueryEntities.Where(o => MergeQueryCompilerContext.GetEntityMetadataManager().IsSharding(o)).Take(2).Count() == 1;
return QueryEntities.Where(o => MergeQueryCompilerContext.GetEntityMetadataManager().IsSharding(o)).Take(2)
.Count() == 1;
}
public Type GetSingleShardingEntityType()
{
return QueryEntities.Single(o => MergeQueryCompilerContext.GetEntityMetadataManager().IsSharding(o));
@ -222,10 +232,12 @@ namespace ShardingCore.Sharding
{
return OptimizeResult.GetMaxQueryConnectionsLimit();
}
public ConnectionModeEnum GetConnectionMode(int sqlCount)
{
return CalcConnectionMode(sqlCount);
}
private ConnectionModeEnum CalcConnectionMode(int sqlCount)
{
switch (OptimizeResult.GetConnectionMode())
@ -236,7 +248,8 @@ namespace ShardingCore.Sharding
{
return GetMaxQueryConnectionsLimit() < sqlCount
? ConnectionModeEnum.CONNECTION_STRICTLY
: ConnectionModeEnum.MEMORY_STRICTLY; ;
: ConnectionModeEnum.MEMORY_STRICTLY;
;
}
}
}
@ -247,7 +260,8 @@ namespace ShardingCore.Sharding
/// <returns></returns>
private bool IsUseReadWriteSeparation()
{
return GetShardingDbContext().IsUseReadWriteSeparation() && GetShardingDbContext().CurrentIsReadWriteSeparation();
return GetShardingDbContext().IsUseReadWriteSeparation() &&
GetShardingDbContext().CurrentIsReadWriteSeparation();
}
/// <summary>
@ -269,6 +283,7 @@ namespace ShardingCore.Sharding
return false;
return QueryTrack() && _trackerManager.EntityUseTrack(entityType);
}
private bool QueryTrack()
{
return MergeQueryCompilerContext.IsQueryTrack();
@ -279,16 +294,16 @@ namespace ShardingCore.Sharding
return GetShardingDbContext().GetShardingRuntimeContext().GetRequiredService<IShardingComparer>();
}
/// <summary>
/// 如果返回false那么就说明不需要继续查询了
/// 返回true表示需要继续查询
/// </summary>
/// <param name="emptyFunc"></param>
/// <param name="r"></param>
/// <typeparam name="TResult"></typeparam>
/// <returns></returns>
/// <exception cref="ShardingCoreQueryRouteNotMatchException"></exception>
public bool TryPrepareExecuteContinueQuery<TResult>(Func<TResult> emptyFunc,out TResult r)
/// <summary>
/// 如果返回false那么就说明不需要继续查询了
/// 返回true表示需要继续查询
/// </summary>
/// <param name="emptyFunc"></param>
/// <param name="r"></param>
/// <typeparam name="TResult"></typeparam>
/// <returns></returns>
/// <exception cref="ShardingCoreQueryRouteNotMatchException"></exception>
public bool TryPrepareExecuteContinueQuery<TResult>(Func<TResult> emptyFunc, out TResult r)
{
if (TakeZeroNoQueryExecute())
{
@ -300,7 +315,8 @@ namespace ShardingCore.Sharding
{
if (ThrowIfQueryRouteNotMatch())
{
throw new ShardingCoreQueryRouteNotMatchException(MergeQueryCompilerContext.GetQueryExpression().ShardingPrint());
throw new ShardingCoreQueryRouteNotMatchException(MergeQueryCompilerContext.GetQueryExpression()
.ShardingPrint());
}
else
{
@ -312,6 +328,7 @@ namespace ShardingCore.Sharding
r = default;
return true;
}
/// <summary>
/// 无路由匹配
/// </summary>
@ -339,6 +356,7 @@ namespace ShardingCore.Sharding
{
return MergeQueryCompilerContext.UseUnionAllMerge();
}
public void Dispose()
{
foreach (var dbContext in _parallelDbContexts.Keys)
@ -381,7 +399,8 @@ namespace ShardingCore.Sharding
{
if (Orders.Any())
{
var propertyOrders = Orders.Select(o => new PropertyOrder(o.PropertyExpression, !o.IsAsc, o.OwnerType)).ToArray();
var propertyOrders = Orders.Select(o => new PropertyOrder(o.PropertyExpression, !o.IsAsc, o.OwnerType))
.ToArray();
ReSetOrders(propertyOrders);
}
}

View File

@ -1,194 +0,0 @@
// using Microsoft.EntityFrameworkCore;
// using Microsoft.Extensions.DependencyInjection;
// using ShardingCore.Core.ShardingConfigurations.Abstractions;
// using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
// using ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions;
// using ShardingCore.Exceptions;
// using ShardingCore.Extensions;
// using ShardingCore.Sharding.Abstractions;
// using System;
// using System.Collections.Generic;
// using System.Linq;
// using System.Reflection;
// using ShardingCore.Core.EntityMetadatas;
// using ShardingCore.Core.TrackerManagers;
// using ShardingCore.Core.VirtualDatabase.VirtualTables;
//
// namespace ShardingCore
// {
// /*
// * @Author: xjm
// * @Description:
// * @Date: Saturday, 02 January 2021 19:37:27
// * @Email: 326308290@qq.com
// */
// /// <summary>
// /// 分片容器全局唯一提供静态依赖注入<code>IServiceProvider</code>
// /// </summary>
// public class ShardingContainer
// {
// private ShardingContainer()
// {
//
// }
//
// private static IServiceProvider serviceProvider;
//
// public static IServiceProvider ServiceProvider
// {
// get { return serviceProvider ?? throw new ShardingCoreInvalidOperationException("sharding core not start"); }
// }
// /// <summary>
// /// 静态注入
// /// </summary>
// /// <param name="services"></param>
// public static void SetServices(IServiceProvider services)
// {
// if (serviceProvider == null)
// serviceProvider = services;
// }
//
// /// <summary>
// /// 获取服务
// /// </summary>
// /// <typeparam name="T"></typeparam>
// /// <returns></returns>
// public static T GetService<T>()
// {
// return ServiceProvider.GetService<T>();
// }
// /// <summary>
// /// 获取服务集合
// /// </summary>
// /// <typeparam name="T"></typeparam>
// /// <returns></returns>
// public static IEnumerable<T> GetServices<T>()
// {
// return ServiceProvider.GetServices<T>();
// }
// /// <summary>
// /// 根据类型获取服务
// /// </summary>
// /// <param name="serviceType"></param>
// /// <returns></returns>
// public static object GetService(Type serviceType)
// {
// return ServiceProvider.GetService(serviceType);
// }
// /// <summary>
// /// 创建一个没有依赖注入的对象,但是对象的构造函数参数是已经可以通过依赖注入获取的
// /// </summary>
// /// <param name="serviceType"></param>
// /// <returns></returns>
// /// <exception cref="ArgumentException"></exception>
// public static object CreateInstance(Type serviceType)
// {
// var constructors
// = serviceType.GetTypeInfo().DeclaredConstructors
// .Where(c => !c.IsStatic && c.IsPublic)
// .ToArray();
//
// if (constructors.Length != 1)
// {
// throw new ArgumentException(
// $"type :[{serviceType}] found more than one declared constructor ");
// }
// var @params = constructors[0].GetParameters().Select(x => ServiceProvider.GetService(x.ParameterType))
// .ToArray();
// return Activator.CreateInstance(serviceType, @params);
// }
// /// <summary>
// /// 创建一个没有依赖注入的对象,但是对象的构造函数参数是已经可以通过依赖注入获取并且也存在自行传入的参数,优先判断自行传入的参数
// /// </summary>
// /// <param name="serviceType"></param>
// /// <param name="args"></param>
// /// <returns></returns>
// /// <exception cref="ArgumentException"></exception>
// public static object CreateInstanceWithInputParams(Type serviceType, params object[] args)
// {
// var constructors
// = serviceType.GetTypeInfo().DeclaredConstructors
// .Where(c => !c.IsStatic && c.IsPublic)
// .ToArray();
//
// if (constructors.Length != 1)
// {
// throw new ArgumentException(
// $"type :[{serviceType}] found more than one declared constructor ");
// }
//
// var argIsNotEmpty = args.IsNotEmpty();
// var @params = constructors[0].GetParameters().Select(x =>
// {
// if (argIsNotEmpty)
// {
// var arg = args.FirstOrDefault(o => o.GetType() == x.ParameterType);
// if (arg != null)
// return arg;
// }
// return ServiceProvider.GetService(x.ParameterType);
// })
// .ToArray();
// return Activator.CreateInstance(serviceType, @params);
// }
//
// //public static IShardingConfigOption<TShardingDbContext> GetRequiredShardingConfigOption<TShardingDbContext>()
// // where TShardingDbContext : DbContext, IShardingDbContext
// //{
// // return (IShardingConfigOption<TShardingDbContext>)GetRequiredShardingConfigOption(typeof(TShardingDbContext));
// //}
// //public static IShardingConfigOption GetRequiredShardingConfigOption(Type shardingDbContextType)
// //{
// // return (IShardingConfigOption)ServiceProvider.GetService(typeof(IShardingConfigOption<>).GetGenericType0(shardingDbContextType));
// //}
// public static IShardingRouteConfigOptions<TShardingDbContext> GetRequiredShardingEntityConfigOption<TShardingDbContext>()
// where TShardingDbContext : DbContext, IShardingDbContext
// {
// return (IShardingRouteConfigOptions<TShardingDbContext>)GetRequiredShardingEntityConfigOption(typeof(TShardingDbContext));
// }
// public static IShardingRouteConfigOptions GetRequiredShardingEntityConfigOption(Type shardingDbContextType)
// {
// return (IShardingRouteConfigOptions)ServiceProvider.GetService(typeof(IShardingRouteConfigOptions<>).GetGenericType0(shardingDbContextType));
// }
//
// public static IVirtualDataSourceManager<TShardingDbContext> GetRequiredVirtualDataSourceManager<TShardingDbContext>()
// where TShardingDbContext : DbContext, IShardingDbContext
// {
// return (IVirtualDataSourceManager<TShardingDbContext>)GetRequiredVirtualDataSourceManager(typeof(TShardingDbContext));
// }
// public static IVirtualDataSourceManager GetRequiredVirtualDataSourceManager(Type shardingDbContextType)
// {
// return (IVirtualDataSourceManager)ServiceProvider.GetService(typeof(IVirtualDataSourceManager<>).GetGenericType0(shardingDbContextType));
// }
// public static IVirtualDataSource<TShardingDbContext> GetRequiredCurrentVirtualDataSource<TShardingDbContext>()
// where TShardingDbContext : DbContext, IShardingDbContext
// {
// return GetRequiredVirtualDataSourceManager<TShardingDbContext>().GetCurrentVirtualDataSource() ?? throw new InvalidOperationException("cant resolve CurrentVirtualDataSource");
// }
// public static IVirtualDataSource GetRequiredCurrentVirtualDataSource(Type shardingDbContextType)
// {
// return GetRequiredVirtualDataSourceManager(shardingDbContextType).GetCurrentVirtualDataSource()??throw new InvalidOperationException("cant resolve CurrentVirtualDataSource");
// }
//
// public static IEntityMetadataManager GetRequiredEntityMetadataManager(Type shardingDbContextType)
// {
// return (IEntityMetadataManager)ServiceProvider.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContextType));
// }
//
// public static IEntityMetadataManager<TShardingDbContext> GetRequiredEntityMetadataManager<TShardingDbContext>()
// where TShardingDbContext : DbContext, IShardingDbContext
// {
// return (IEntityMetadataManager<TShardingDbContext>)GetRequiredEntityMetadataManager(typeof(TShardingDbContext));
// }
//
// public static ITrackerManager GetTrackerManager(Type shardingDbContextType)
// {
// return (ITrackerManager)ServiceProvider.GetService(typeof(ITrackerManager<>).GetGenericType0(shardingDbContextType));
// }
//
// public static IVirtualTableManager GetVirtualTableManager(Type shardingDbContextType)
// {
// return (IVirtualTableManager)ServiceProvider.GetService(typeof(IVirtualTableManager<>).GetGenericType0(shardingDbContextType));
// }
// }
// }

View File

@ -13,6 +13,7 @@ using System.Threading;
using ShardingCore.Core.DbContextCreator;
using ShardingCore.Core.ServiceProviders;
using ShardingCore.Logger;
using ShardingCore.Sharding;
namespace ShardingCore.TableCreator
{
@ -60,7 +61,7 @@ namespace ShardingCore.TableCreator
using (var shellDbContext = _dbContextCreator.GetShellDbContext(scope.ServiceProvider))
{
using (var context = ((IShardingDbContext)shellDbContext).GetDbContext(dataSourceName, true,
using (var context = ((IShardingDbContext)shellDbContext).GetIndependentWriteDbContext(dataSourceName,
_routeTailFactory.Create(tail, false)))
{
context.RemoveDbContextRelationModelSaveOnlyThatIsNamedType(shardingEntityType);

View File

@ -4,6 +4,8 @@ using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding.Abstractions;
using System.Collections.Generic;
using System.Data.Common;
using ShardingCore.Extensions;
using ShardingCore.Sharding;
namespace ShardingCore.TableExists.Abstractions
{
@ -18,7 +20,7 @@ namespace ShardingCore.TableExists.Abstractions
public ISet<string> GetExistTables(IShardingDbContext shardingDbContext, string dataSourceName)
{
using (var dbContext =
shardingDbContext.GetDbContext(dataSourceName, true, RouteTailFactory.Create(string.Empty)))
shardingDbContext.GetIndependentWriteDbContext(dataSourceName, RouteTailFactory.Create(string.Empty)))
{
var dbConnection = dbContext.Database.GetDbConnection();
dbConnection.Open();