针对ShardingMigration和UseAutoTryCompensateTable进行并发优化

This commit is contained in:
xuejiaming 2022-07-05 11:09:26 +08:00
parent d1c2924029
commit deffbd5f77
5 changed files with 104 additions and 41 deletions

View File

@ -164,8 +164,10 @@ namespace Sample.MySql
} }
} }
Stopwatch sp = Stopwatch.StartNew();
app.ApplicationServices.UseAutoTryCompensateTable(); app.ApplicationServices.UseAutoTryCompensateTable();
sp.Stop();
Console.WriteLine("UseAutoTryCompensateTable:"+sp.ElapsedMilliseconds);
app.UseRouting(); app.UseRouting();
app.UseAuthorization(); app.UseAuthorization();

View File

@ -0,0 +1,17 @@
using Microsoft.EntityFrameworkCore;
namespace ShardingCore.EFCores
{
public class MigrateUnit
{
public MigrateUnit(DbContext shellDbContext, string dataSourceName)
{
ShellDbContext = shellDbContext;
DataSourceName = dataSourceName;
}
public DbContext ShellDbContext { get; }
public string DataSourceName { get; }
}
}

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -113,27 +114,31 @@ namespace ShardingCore.EFCores
} }
public override async Task MigrateAsync(string targetMigration = null, CancellationToken cancellationToken = new CancellationToken()) public override async Task MigrateAsync(string targetMigration = null, CancellationToken cancellationToken = new CancellationToken())
{ {
var defaultDataSourceName = _virtualDataSource.DefaultDataSourceName;
var allDataSourceNames = _virtualDataSource.GetAllDataSourceNames(); var allDataSourceNames = _virtualDataSource.GetAllDataSourceNames();
using (var scope=_shardingProvider.CreateScope()) using (var scope=_shardingProvider.CreateScope())
{ {
using (var shellDbContext = _dbContextCreator.GetShellDbContext(scope.ServiceProvider)) using (var shellDbContext = _dbContextCreator.GetShellDbContext(scope.ServiceProvider))
{ {
var migrationParallelCount = _shardingConfigOptions.MigrationParallelCount; var migrationParallelCount = _shardingConfigOptions.MigrationParallelCount;
var partitionMigrationUnits = allDataSourceNames.Partition(migrationParallelCount); //默认数据源需要最后执行 否则可能会导致异常的情况下GetPendingMigrations为空
var partitionMigrationUnits = allDataSourceNames.Where(o=>o!=defaultDataSourceName).Partition(migrationParallelCount);
foreach (var migrationUnits in partitionMigrationUnits) foreach (var migrationUnits in partitionMigrationUnits)
{ {
var migrateUnits = migrationUnits.Select(o =>new MigrateUnit(shellDbContext,o)).ToList(); var migrateUnits = migrationUnits.Select(o =>new MigrateUnit(shellDbContext,o)).ToList();
await ExecuteMigrateUnitsAsync(migrateUnits); await ExecuteMigrateUnitsAsync(migrateUnits);
} }
await ExecuteMigrateUnitsAsync(new List<MigrateUnit>(){new MigrateUnit(shellDbContext,defaultDataSourceName)});
} }
} }
} }
private async Task ExecuteMigrateUnitsAsync(List<MigrateUnit> migrateUnits) private async Task ExecuteMigrateUnitsAsync(List<MigrateUnit> migrateUnits)
{ {
var migrateTasks = migrateUnits.Select(migrateUnit => var migrateTasks = migrateUnits.Select(migrateUnit =>
{ {
return Task.Run(async () => return Task.Run( () =>
{ {
using (_shardingMigrationManager.CreateScope()) using (_shardingMigrationManager.CreateScope())
{ {
@ -146,29 +151,19 @@ namespace ShardingCore.EFCores
new ShardingDbContextOptions(dbContextOptions, new ShardingDbContextOptions(dbContextOptions,
_routeTailFactory.Create(string.Empty, false)))) _routeTailFactory.Create(string.Empty, false))))
{ {
if ((await dbContext.Database.GetPendingMigrationsAsync()).Any()) if (( dbContext.Database.GetPendingMigrations()).Any())
{ {
await dbContext.Database.MigrateAsync(); dbContext.Database.Migrate();
}
} }
} }
}
return 1; return 1;
}); });
}).ToArray(); }).ToArray();
await TaskHelper.WhenAllFastFail(migrateTasks); await TaskHelper.WhenAllFastFail(migrateTasks);
} }
} }
public class MigrateUnit
{
public MigrateUnit(DbContext shellDbContext, string dataSourceName)
{
ShellDbContext = shellDbContext;
DataSourceName = dataSourceName;
}
public DbContext ShellDbContext { get; }
public string DataSourceName { get; }
}
} }

View File

@ -0,0 +1,12 @@
namespace ShardingCore
{
public class InitConfigureUnit
{
public InitConfigureUnit(string dataSourceName)
{
DataSourceName = dataSourceName;
}
public string DataSourceName { get; }
}
}

View File

@ -24,6 +24,9 @@ using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingQueryExecutors; using ShardingCore.Sharding.ShardingQueryExecutors;
using ShardingCore.TableCreator; using ShardingCore.TableCreator;
using System; using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Bootstrappers; using ShardingCore.Bootstrappers;
using ShardingCore.Core.DbContextCreator; using ShardingCore.Core.DbContextCreator;
using ShardingCore.Core.QueryTrackers; using ShardingCore.Core.QueryTrackers;
@ -38,6 +41,7 @@ using ShardingCore.Core.VirtualRoutes.Abstractions;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes; using ShardingCore.Core.VirtualRoutes.DataSourceRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes; using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.DynamicDataSources; using ShardingCore.DynamicDataSources;
using ShardingCore.Extensions;
using ShardingCore.Sharding.MergeContexts; using ShardingCore.Sharding.MergeContexts;
using ShardingCore.Sharding.ParallelTables; using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.Parsers; using ShardingCore.Sharding.Parsers;
@ -70,7 +74,8 @@ namespace ShardingCore
/// <param name="optionsLifetime"></param> /// <param name="optionsLifetime"></param>
/// <returns></returns> /// <returns></returns>
/// <exception cref="NotSupportedException"></exception> /// <exception cref="NotSupportedException"></exception>
public static ShardingCoreConfigBuilder<TShardingDbContext> AddShardingDbContext<TShardingDbContext>(this IServiceCollection services, public static ShardingCoreConfigBuilder<TShardingDbContext> AddShardingDbContext<TShardingDbContext>(
this IServiceCollection services,
ServiceLifetime contextLifetime = ServiceLifetime.Scoped, ServiceLifetime contextLifetime = ServiceLifetime.Scoped,
ServiceLifetime optionsLifetime = ServiceLifetime.Scoped) ServiceLifetime optionsLifetime = ServiceLifetime.Scoped)
where TShardingDbContext : DbContext, IShardingDbContext where TShardingDbContext : DbContext, IShardingDbContext
@ -79,39 +84,45 @@ namespace ShardingCore
throw new NotSupportedException($"{nameof(contextLifetime)}:{nameof(ServiceLifetime.Singleton)}"); throw new NotSupportedException($"{nameof(contextLifetime)}:{nameof(ServiceLifetime.Singleton)}");
if (optionsLifetime == ServiceLifetime.Singleton) if (optionsLifetime == ServiceLifetime.Singleton)
throw new NotSupportedException($"{nameof(optionsLifetime)}:{nameof(ServiceLifetime.Singleton)}"); throw new NotSupportedException($"{nameof(optionsLifetime)}:{nameof(ServiceLifetime.Singleton)}");
services.AddDbContext<TShardingDbContext>(UseDefaultSharding<TShardingDbContext>, contextLifetime, optionsLifetime); services.AddDbContext<TShardingDbContext>(UseDefaultSharding<TShardingDbContext>, contextLifetime,
optionsLifetime);
return services.AddShardingConfigure<TShardingDbContext>(); return services.AddShardingConfigure<TShardingDbContext>();
} }
public static ShardingCoreConfigBuilder<TShardingDbContext> AddShardingConfigure<TShardingDbContext>(this IServiceCollection services) public static ShardingCoreConfigBuilder<TShardingDbContext> AddShardingConfigure<TShardingDbContext>(
this IServiceCollection services)
where TShardingDbContext : DbContext, IShardingDbContext where TShardingDbContext : DbContext, IShardingDbContext
{ {
//ShardingCoreHelper.CheckContextConstructors<TShardingDbContext>(); //ShardingCoreHelper.CheckContextConstructors<TShardingDbContext>();
return new ShardingCoreConfigBuilder<TShardingDbContext>(services); return new ShardingCoreConfigBuilder<TShardingDbContext>(services);
} }
public static void UseDefaultSharding<TShardingDbContext>(IServiceProvider serviceProvider,DbContextOptionsBuilder dbContextOptionsBuilder) where TShardingDbContext : DbContext, IShardingDbContext public static void UseDefaultSharding<TShardingDbContext>(IServiceProvider serviceProvider,
DbContextOptionsBuilder dbContextOptionsBuilder) where TShardingDbContext : DbContext, IShardingDbContext
{ {
var shardingRuntimeContext = serviceProvider.GetRequiredService<IShardingRuntimeContext>(); var shardingRuntimeContext = serviceProvider.GetRequiredService<IShardingRuntimeContext>();
var virtualDataSource = shardingRuntimeContext.GetVirtualDataSource(); var virtualDataSource = shardingRuntimeContext.GetVirtualDataSource();
var connectionString = virtualDataSource.GetConnectionString(virtualDataSource.DefaultDataSourceName); var connectionString = virtualDataSource.GetConnectionString(virtualDataSource.DefaultDataSourceName);
var contextOptionsBuilder = virtualDataSource.ConfigurationParams.UseDbContextOptionsBuilder(connectionString, dbContextOptionsBuilder) var contextOptionsBuilder = virtualDataSource.ConfigurationParams
.UseDbContextOptionsBuilder(connectionString, dbContextOptionsBuilder)
.UseSharding<TShardingDbContext>(shardingRuntimeContext); .UseSharding<TShardingDbContext>(shardingRuntimeContext);
virtualDataSource.ConfigurationParams.UseShellDbContextOptionBuilder(contextOptionsBuilder); virtualDataSource.ConfigurationParams.UseShellDbContextOptionBuilder(contextOptionsBuilder);
} }
internal static IServiceCollection AddInternalShardingCore<TShardingDbContext>(this IServiceCollection services) where TShardingDbContext : DbContext, IShardingDbContext
internal static IServiceCollection AddInternalShardingCore<TShardingDbContext>(this IServiceCollection services)
where TShardingDbContext : DbContext, IShardingDbContext
{ {
services.TryAddSingleton<IShardingInitializer, ShardingInitializer>(); services.TryAddSingleton<IShardingInitializer, ShardingInitializer>();
services.TryAddSingleton<IShardingBootstrapper, ShardingBootstrapper>(); services.TryAddSingleton<IShardingBootstrapper, ShardingBootstrapper>();
services.TryAddSingleton<IDataSourceInitializer, DataSourceInitializer>(); services.TryAddSingleton<IDataSourceInitializer, DataSourceInitializer>();
services.TryAddSingleton<ITableRouteManager, TableRouteManager>(); services.TryAddSingleton<ITableRouteManager, TableRouteManager>();
services.TryAddSingleton<IVirtualDataSourceConfigurationParams, SimpleVirtualDataSourceConfigurationParams>(); services
.TryAddSingleton<IVirtualDataSourceConfigurationParams, SimpleVirtualDataSourceConfigurationParams>();
//分表dbcontext创建 //分表dbcontext创建
services.TryAddSingleton<IDbContextCreator, ActivatorDbContextCreator<TShardingDbContext>>(); services.TryAddSingleton<IDbContextCreator, ActivatorDbContextCreator<TShardingDbContext>>();
// services.TryAddSingleton<IDataSourceInitializer<TShardingDbContext>, DataSourceInitializer<TShardingDbContext>>(); // services.TryAddSingleton<IDataSourceInitializer<TShardingDbContext>, DataSourceInitializer<TShardingDbContext>>();
services.TryAddSingleton<ITrackerManager, TrackerManager>(); services.TryAddSingleton<ITrackerManager, TrackerManager>();
services.TryAddSingleton<IStreamMergeContextFactory, StreamMergeContextFactory>(); services.TryAddSingleton<IStreamMergeContextFactory, StreamMergeContextFactory>();
@ -173,15 +184,22 @@ namespace ShardingCore
services.TryAddShardingJob(); services.TryAddShardingJob();
return services; return services;
} }
public static DbContextOptionsBuilder UseSharding<TShardingDbContext>(this DbContextOptionsBuilder optionsBuilder,IShardingRuntimeContext shardingRuntimeContext) where TShardingDbContext : DbContext, IShardingDbContext
public static DbContextOptionsBuilder UseSharding<TShardingDbContext>(
this DbContextOptionsBuilder optionsBuilder, IShardingRuntimeContext shardingRuntimeContext)
where TShardingDbContext : DbContext, IShardingDbContext
{ {
return optionsBuilder.UseShardingWrapMark().UseShardingOptions(shardingRuntimeContext) return optionsBuilder.UseShardingWrapMark().UseShardingOptions(shardingRuntimeContext)
.ReplaceService<IDbSetSource, ShardingDbSetSource>() .ReplaceService<IDbSetSource, ShardingDbSetSource>()
.ReplaceService<IQueryCompiler, ShardingQueryCompiler>() .ReplaceService<IQueryCompiler, ShardingQueryCompiler>()
.ReplaceService<IDbContextTransactionManager, ShardingRelationalTransactionManager<TShardingDbContext>>() .ReplaceService<IDbContextTransactionManager,
.ReplaceService<IRelationalTransactionFactory, ShardingRelationalTransactionFactory<TShardingDbContext>>(); ShardingRelationalTransactionManager<TShardingDbContext>>()
.ReplaceService<IRelationalTransactionFactory,
ShardingRelationalTransactionFactory<TShardingDbContext>>();
} }
public static DbContextOptionsBuilder UseShardingOptions(this DbContextOptionsBuilder optionsBuilder,IShardingRuntimeContext shardingRuntimeContext)
public static DbContextOptionsBuilder UseShardingOptions(this DbContextOptionsBuilder optionsBuilder,
IShardingRuntimeContext shardingRuntimeContext)
{ {
var shardingOptionsExtension = optionsBuilder.CreateOrGetShardingOptionsExtension(shardingRuntimeContext); var shardingOptionsExtension = optionsBuilder.CreateOrGetShardingOptionsExtension(shardingRuntimeContext);
((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(shardingOptionsExtension); ((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(shardingOptionsExtension);
@ -196,17 +214,20 @@ namespace ShardingCore
return optionsBuilder; return optionsBuilder;
} }
private static ShardingWrapOptionsExtension CreateOrGetShardingWrapExtension(this DbContextOptionsBuilder optionsBuilder) private static ShardingWrapOptionsExtension CreateOrGetShardingWrapExtension(
this DbContextOptionsBuilder optionsBuilder)
=> optionsBuilder.Options.FindExtension<ShardingWrapOptionsExtension>() ?? => optionsBuilder.Options.FindExtension<ShardingWrapOptionsExtension>() ??
new ShardingWrapOptionsExtension(); new ShardingWrapOptionsExtension();
private static ShardingOptionsExtension CreateOrGetShardingOptionsExtension(this DbContextOptionsBuilder optionsBuilder,IShardingRuntimeContext shardingRuntimeContext)
private static ShardingOptionsExtension CreateOrGetShardingOptionsExtension(
this DbContextOptionsBuilder optionsBuilder, IShardingRuntimeContext shardingRuntimeContext)
=> optionsBuilder.Options.FindExtension<ShardingOptionsExtension>() ?? => optionsBuilder.Options.FindExtension<ShardingOptionsExtension>() ??
new ShardingOptionsExtension(shardingRuntimeContext); new ShardingOptionsExtension(shardingRuntimeContext);
public static DbContextOptionsBuilder UseInnerDbContextSharding(this DbContextOptionsBuilder optionsBuilder) public static DbContextOptionsBuilder UseInnerDbContextSharding(this DbContextOptionsBuilder optionsBuilder)
{ {
return optionsBuilder.ReplaceService<IModelCacheKeyFactory, ShardingModelCacheKeyFactory>() return optionsBuilder.ReplaceService<IModelCacheKeyFactory, ShardingModelCacheKeyFactory>()
.ReplaceService<IModelSource,ShardingModelSource>() .ReplaceService<IModelSource, ShardingModelSource>()
.ReplaceService<IModelCustomizer, ShardingModelCustomizer>(); .ReplaceService<IModelCustomizer, ShardingModelCustomizer>();
} }
@ -221,23 +242,39 @@ namespace ShardingCore
shardingRuntimeContext.CheckRequirement(); shardingRuntimeContext.CheckRequirement();
shardingRuntimeContext.AutoShardingCreate(); shardingRuntimeContext.AutoShardingCreate();
} }
/// <summary> /// <summary>
/// 自动尝试补偿表 /// 自动尝试补偿表
/// </summary> /// </summary>
/// <param name="serviceProvider"></param> /// <param name="serviceProvider"></param>
public static void UseAutoTryCompensateTable(this IServiceProvider serviceProvider) /// <param name="parallelCount"></param>
public static void UseAutoTryCompensateTable(this IServiceProvider serviceProvider, int? parallelCount = null)
{ {
var shardingRuntimeContext = serviceProvider.GetRequiredService<IShardingRuntimeContext>(); var shardingRuntimeContext = serviceProvider.GetRequiredService<IShardingRuntimeContext>();
shardingRuntimeContext.CheckRequirement(); shardingRuntimeContext.CheckRequirement();
var virtualDataSource = shardingRuntimeContext.GetVirtualDataSource(); var virtualDataSource = shardingRuntimeContext.GetVirtualDataSource();
var dataSourceInitializer = shardingRuntimeContext.GetDataSourceInitializer(); var dataSourceInitializer = shardingRuntimeContext.GetDataSourceInitializer();
var shardingConfigOptions = shardingRuntimeContext.GetShardingConfigOptions();
var compensateTableParallelCount = parallelCount ?? shardingConfigOptions.CompensateTableParallelCount;
var allDataSourceNames = virtualDataSource.GetAllDataSourceNames(); var allDataSourceNames = virtualDataSource.GetAllDataSourceNames();
foreach (var dataSourceName in allDataSourceNames) var partitionMigrationUnits = allDataSourceNames.Partition(compensateTableParallelCount);
foreach (var migrationUnits in partitionMigrationUnits)
{ {
dataSourceInitializer.InitConfigure(dataSourceName,true,true); var migrateUnits = migrationUnits.Select(o => new InitConfigureUnit(o)).ToList();
ExecuteInitConfigureUnit(dataSourceInitializer, migrateUnits);
} }
} }
private static void ExecuteInitConfigureUnit(IDataSourceInitializer dataSourceInitializer,
List<InitConfigureUnit> initConfigureUnits)
{
var initConfigureTasks = initConfigureUnits.Select(o =>
{
return Task.Run(() => { dataSourceInitializer.InitConfigure(o.DataSourceName, true, true); });
}).ToArray();
Task.WaitAll(initConfigureTasks);
}
//public static IServiceCollection AddSingleShardingDbContext<TShardingDbContext>(this IServiceCollection services, Action<ShardingConfigOptions> configure, //public static IServiceCollection AddSingleShardingDbContext<TShardingDbContext>(this IServiceCollection services, Action<ShardingConfigOptions> configure,
// Action<string, DbContextOptionsBuilder> optionsAction = null, // Action<string, DbContextOptionsBuilder> optionsAction = null,