修复bug
This commit is contained in:
parent
26fe6d225c
commit
058dae0d87
|
@ -151,7 +151,8 @@ namespace Sample.MySql
|
|||
app.UseDeveloperExceptionPage();
|
||||
}
|
||||
app.ApplicationServices.UseAutoShardingCreate();
|
||||
|
||||
var shardingRuntimeContext = app.ApplicationServices.GetRequiredService<IShardingRuntimeContext>();
|
||||
var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();
|
||||
using (var scope = app.ApplicationServices.CreateScope())
|
||||
{
|
||||
var defaultShardingDbContext = scope.ServiceProvider.GetService<DefaultShardingDbContext>();
|
||||
|
|
|
@ -116,7 +116,7 @@ namespace ShardingCore.Core.EntityMetadatas
|
|||
_logicTableCaches.TryAdd(metadata.LogicTableName, metadatas);
|
||||
}
|
||||
|
||||
if (metadatas.Any(o => o.EntityType != efEntityType.ClrType))
|
||||
if (metadatas.All(o => o.EntityType != efEntityType.ClrType))
|
||||
{
|
||||
metadatas.Add(metadata);
|
||||
return true;
|
||||
|
|
|
@ -33,24 +33,12 @@ namespace ShardingCore.EFCores
|
|||
public class ShardingMigrator:Migrator
|
||||
{
|
||||
private readonly IShardingRuntimeContext _shardingRuntimeContext;
|
||||
private readonly IVirtualDataSource _virtualDataSource;
|
||||
private readonly ShardingConfigOptions _shardingConfigOptions;
|
||||
private readonly IShardingProvider _shardingProvider;
|
||||
private readonly IDbContextCreator _dbContextCreator;
|
||||
private readonly IRouteTailFactory _routeTailFactory;
|
||||
private readonly IShardingMigrationManager _shardingMigrationManager;
|
||||
|
||||
|
||||
#if EFCORE6
|
||||
public ShardingMigrator(IShardingRuntimeContext shardingRuntimeContext,IMigrationsAssembly migrationsAssembly, IHistoryRepository historyRepository, IDatabaseCreator databaseCreator, IMigrationsSqlGenerator migrationsSqlGenerator, IRawSqlCommandBuilder rawSqlCommandBuilder, IMigrationCommandExecutor migrationCommandExecutor, IRelationalConnection connection, ISqlGenerationHelper sqlGenerationHelper, ICurrentDbContext currentContext, IModelRuntimeInitializer modelRuntimeInitializer, IDiagnosticsLogger<DbLoggerCategory.Migrations> logger, IRelationalCommandDiagnosticsLogger commandLogger, IDatabaseProvider databaseProvider) : base(migrationsAssembly, historyRepository, databaseCreator, migrationsSqlGenerator, rawSqlCommandBuilder, migrationCommandExecutor, connection, sqlGenerationHelper, currentContext, modelRuntimeInitializer, logger, commandLogger, databaseProvider)
|
||||
{
|
||||
_shardingRuntimeContext = shardingRuntimeContext;
|
||||
_virtualDataSource = _shardingRuntimeContext.GetVirtualDataSource();
|
||||
_shardingConfigOptions = _shardingRuntimeContext.GetShardingConfigOptions();
|
||||
_shardingProvider = _shardingRuntimeContext.GetShardingProvider();
|
||||
_dbContextCreator = _shardingRuntimeContext.GetDbContextCreator();
|
||||
_routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
|
||||
_shardingMigrationManager = _shardingRuntimeContext.GetShardingMigrationManager();
|
||||
|
||||
}
|
||||
#endif
|
||||
|
@ -58,13 +46,6 @@ namespace ShardingCore.EFCores
|
|||
public ShardingMigrator(IShardingRuntimeContext shardingRuntimeContext, IMigrationsAssembly migrationsAssembly, IHistoryRepository historyRepository, IDatabaseCreator databaseCreator, IMigrationsSqlGenerator migrationsSqlGenerator, IRawSqlCommandBuilder rawSqlCommandBuilder, IMigrationCommandExecutor migrationCommandExecutor, IRelationalConnection connection, ISqlGenerationHelper sqlGenerationHelper, ICurrentDbContext currentContext, IConventionSetBuilder conventionSetBuilder, IDiagnosticsLogger<DbLoggerCategory.Migrations> logger, IDiagnosticsLogger<DbLoggerCategory.Database.Command> commandLogger, IDatabaseProvider databaseProvider) : base(migrationsAssembly, historyRepository, databaseCreator, migrationsSqlGenerator, rawSqlCommandBuilder, migrationCommandExecutor, connection, sqlGenerationHelper, currentContext, conventionSetBuilder, logger, commandLogger, databaseProvider)
|
||||
{
|
||||
_shardingRuntimeContext = shardingRuntimeContext;
|
||||
_virtualDataSource = _shardingRuntimeContext.GetVirtualDataSource();
|
||||
_shardingConfigOptions = _shardingRuntimeContext.GetShardingConfigOptions();
|
||||
_shardingProvider = _shardingRuntimeContext.GetShardingProvider();
|
||||
_dbContextCreator = _shardingRuntimeContext.GetDbContextCreator();
|
||||
_routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
|
||||
_shardingMigrationManager = _shardingRuntimeContext.GetShardingMigrationManager();
|
||||
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -72,13 +53,6 @@ namespace ShardingCore.EFCores
|
|||
public ShardingMigrator(IShardingRuntimeContext shardingRuntimeContext, IMigrationsAssembly migrationsAssembly, IHistoryRepository historyRepository, IDatabaseCreator databaseCreator, IMigrationsSqlGenerator migrationsSqlGenerator, IRawSqlCommandBuilder rawSqlCommandBuilder, IMigrationCommandExecutor migrationCommandExecutor, IRelationalConnection connection, ISqlGenerationHelper sqlGenerationHelper, ICurrentDbContext currentContext, IDiagnosticsLogger<DbLoggerCategory.Migrations> logger, IDiagnosticsLogger<DbLoggerCategory.Database.Command> commandLogger, IDatabaseProvider databaseProvider) : base(migrationsAssembly, historyRepository, databaseCreator, migrationsSqlGenerator, rawSqlCommandBuilder, migrationCommandExecutor, connection, sqlGenerationHelper, currentContext, logger, commandLogger, databaseProvider)
|
||||
{
|
||||
_shardingRuntimeContext = shardingRuntimeContext;
|
||||
_virtualDataSource = _shardingRuntimeContext.GetVirtualDataSource();
|
||||
_shardingConfigOptions = _shardingRuntimeContext.GetShardingConfigOptions();
|
||||
_shardingProvider = _shardingRuntimeContext.GetShardingProvider();
|
||||
_dbContextCreator = _shardingRuntimeContext.GetDbContextCreator();
|
||||
_routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
|
||||
_shardingMigrationManager = _shardingRuntimeContext.GetShardingMigrationManager();
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -87,13 +61,6 @@ namespace ShardingCore.EFCores
|
|||
public ShardingMigrator(IShardingRuntimeContext shardingRuntimeContext, IMigrationsAssembly migrationsAssembly, IHistoryRepository historyRepository, IDatabaseCreator databaseCreator, IMigrationsSqlGenerator migrationsSqlGenerator, IRawSqlCommandBuilder rawSqlCommandBuilder, IMigrationCommandExecutor migrationCommandExecutor, IRelationalConnection connection, ISqlGenerationHelper sqlGenerationHelper, IDiagnosticsLogger<DbLoggerCategory.Migrations> logger, IDatabaseProvider databaseProvider) : base(migrationsAssembly, historyRepository, databaseCreator, migrationsSqlGenerator, rawSqlCommandBuilder, migrationCommandExecutor, connection, sqlGenerationHelper, logger, databaseProvider)
|
||||
{
|
||||
_shardingRuntimeContext = shardingRuntimeContext;
|
||||
_virtualDataSource = _shardingRuntimeContext.GetVirtualDataSource();
|
||||
_shardingConfigOptions = _shardingRuntimeContext.GetShardingConfigOptions();
|
||||
_shardingProvider = _shardingRuntimeContext.GetShardingProvider();
|
||||
_dbContextCreator = _shardingRuntimeContext.GetDbContextCreator();
|
||||
_routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
|
||||
_shardingMigrationManager = _shardingRuntimeContext.GetShardingMigrationManager();
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -103,72 +70,13 @@ namespace ShardingCore.EFCores
|
|||
// base.Migrate(targetMigration);
|
||||
}
|
||||
|
||||
private DbContextOptions CreateDbContextOptions(Type dbContextType,string dataSourceName)
|
||||
{
|
||||
var dbContextOptionBuilder = DataSourceDbContext.CreateDbContextOptionBuilder(dbContextType);
|
||||
var connectionString = _virtualDataSource.GetConnectionString(dataSourceName);
|
||||
_virtualDataSource.UseDbContextOptionsBuilder(connectionString, dbContextOptionBuilder);
|
||||
_shardingConfigOptions.ShardingMigrationConfigure?.Invoke(dbContextOptionBuilder);
|
||||
//迁移
|
||||
dbContextOptionBuilder.UseShardingOptions(_shardingRuntimeContext);
|
||||
return dbContextOptionBuilder.Options;
|
||||
}
|
||||
public override async Task MigrateAsync(string targetMigration = null, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
var defaultDataSourceName = _virtualDataSource.DefaultDataSourceName;
|
||||
var allDataSourceNames = _virtualDataSource.GetAllDataSourceNames();
|
||||
using (var scope=_shardingProvider.CreateScope())
|
||||
{
|
||||
using (var shellDbContext = _dbContextCreator.GetShellDbContext(scope.ServiceProvider))
|
||||
{
|
||||
var migrationParallelCount = _shardingConfigOptions.MigrationParallelCount;
|
||||
if (migrationParallelCount <= 0)
|
||||
{
|
||||
throw new ShardingCoreInvalidOperationException($"migration parallel count must >0");
|
||||
}
|
||||
//默认数据源需要最后执行 否则可能会导致异常的情况下GetPendingMigrations为空
|
||||
var partitionMigrationUnits = allDataSourceNames.Where(o=>o!=defaultDataSourceName).Partition(migrationParallelCount);
|
||||
foreach (var migrationUnits in partitionMigrationUnits)
|
||||
{
|
||||
var migrateUnits = migrationUnits.Select(o =>new MigrateUnit(shellDbContext,o)).ToList();
|
||||
await ExecuteMigrateUnitsAsync(migrateUnits);
|
||||
}
|
||||
await ExecuteMigrateUnitsAsync(new List<MigrateUnit>(){new MigrateUnit(shellDbContext,defaultDataSourceName)});
|
||||
}
|
||||
}
|
||||
var virtualDataSource = _shardingRuntimeContext.GetVirtualDataSource();
|
||||
var allDataSourceNames = virtualDataSource.GetAllDataSourceNames();
|
||||
await DynamicShardingHelper.DynamicMigrateWithDataSourcesAsync(_shardingRuntimeContext, allDataSourceNames, null,cancellationToken);
|
||||
|
||||
}
|
||||
|
||||
private async Task ExecuteMigrateUnitsAsync(List<MigrateUnit> migrateUnits)
|
||||
{
|
||||
var migrateTasks = migrateUnits.Select(migrateUnit =>
|
||||
{
|
||||
return Task.Run( () =>
|
||||
{
|
||||
using (_shardingMigrationManager.CreateScope())
|
||||
{
|
||||
_shardingMigrationManager.Current.CurrentDataSourceName = migrateUnit.DataSourceName;
|
||||
|
||||
var dbContextOptions = CreateDbContextOptions(migrateUnit.ShellDbContext.GetType(),
|
||||
migrateUnit.DataSourceName);
|
||||
|
||||
using (var dbContext = _dbContextCreator.CreateDbContext(migrateUnit.ShellDbContext,
|
||||
new ShardingDbContextOptions(dbContextOptions,
|
||||
_routeTailFactory.Create(string.Empty, false))))
|
||||
{
|
||||
if (( dbContext.Database.GetPendingMigrations()).Any())
|
||||
{
|
||||
dbContext.Database.Migrate();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return 1;
|
||||
|
||||
});
|
||||
}).ToArray();
|
||||
await TaskHelper.WhenAllFastFail(migrateTasks);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2,11 +2,18 @@
|
|||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ShardingCore.Core.DbContextCreator;
|
||||
using ShardingCore.Core.RuntimeContexts;
|
||||
using ShardingCore.Core.ShardingMigrations.Abstractions;
|
||||
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
|
||||
using ShardingCore.EFCores;
|
||||
using ShardingCore.Exceptions;
|
||||
using ShardingCore.Extensions;
|
||||
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
|
||||
using ShardingCore.Sharding.ShardingDbContextExecutors;
|
||||
|
||||
namespace ShardingCore.Helpers
|
||||
{
|
||||
|
@ -44,6 +51,87 @@ namespace ShardingCore.Helpers
|
|||
virtualDataSource.AddPhysicDataSource(new DefaultPhysicDataSource(dataSourceName, connectionString, false));
|
||||
}
|
||||
|
||||
public static async Task DynamicMigrateWithDataSourcesAsync(IShardingRuntimeContext shardingRuntimeContext,
|
||||
List<string> allDataSourceNames,int? migrationParallelCount,CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
var dbContextCreator = shardingRuntimeContext.GetDbContextCreator();
|
||||
var shardingProvider = shardingRuntimeContext.GetShardingProvider();
|
||||
var shardingConfigOptions = shardingRuntimeContext.GetShardingConfigOptions();
|
||||
var defaultDataSourceName = shardingRuntimeContext.GetVirtualDataSource().DefaultDataSourceName;
|
||||
|
||||
using (var scope=shardingProvider.CreateScope())
|
||||
{
|
||||
using (var shellDbContext = dbContextCreator.GetShellDbContext(scope.ServiceProvider))
|
||||
{
|
||||
var parallelCount = migrationParallelCount ?? shardingConfigOptions.MigrationParallelCount;
|
||||
if (parallelCount <= 0)
|
||||
{
|
||||
throw new ShardingCoreInvalidOperationException($"migration parallel count must >0");
|
||||
}
|
||||
//默认数据源需要最后执行 否则可能会导致异常的情况下GetPendingMigrations为空
|
||||
var partitionMigrationUnits = allDataSourceNames.Where(o=>o!=defaultDataSourceName).Partition(parallelCount);
|
||||
foreach (var migrationUnits in partitionMigrationUnits)
|
||||
{
|
||||
var migrateUnits = migrationUnits.Select(o =>new MigrateUnit(shellDbContext,o)).ToList();
|
||||
await ExecuteMigrateUnitsAsync(shardingRuntimeContext,migrateUnits,cancellationToken);
|
||||
}
|
||||
|
||||
//包含默认默认的单独最后一次处理
|
||||
if (allDataSourceNames.Contains(defaultDataSourceName))
|
||||
{
|
||||
await ExecuteMigrateUnitsAsync(shardingRuntimeContext,new List<MigrateUnit>(){new MigrateUnit(shellDbContext,defaultDataSourceName)},cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task ExecuteMigrateUnitsAsync(IShardingRuntimeContext shardingRuntimeContext,List<MigrateUnit> migrateUnits,CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
var shardingMigrationManager = shardingRuntimeContext.GetShardingMigrationManager();
|
||||
var dbContextCreator = shardingRuntimeContext.GetDbContextCreator();
|
||||
var routeTailFactory = shardingRuntimeContext.GetRouteTailFactory();
|
||||
var migrateTasks = migrateUnits.Select(migrateUnit =>
|
||||
{
|
||||
return Task.Run( () =>
|
||||
{
|
||||
using (shardingMigrationManager.CreateScope())
|
||||
{
|
||||
shardingMigrationManager.Current.CurrentDataSourceName = migrateUnit.DataSourceName;
|
||||
|
||||
var dbContextOptions = CreateDbContextOptions(shardingRuntimeContext,migrateUnit.ShellDbContext.GetType(),
|
||||
migrateUnit.DataSourceName);
|
||||
|
||||
using (var dbContext = dbContextCreator.CreateDbContext(migrateUnit.ShellDbContext,
|
||||
new ShardingDbContextOptions(dbContextOptions,
|
||||
routeTailFactory.Create(string.Empty, false))))
|
||||
{
|
||||
if (( dbContext.Database.GetPendingMigrations()).Any())
|
||||
{
|
||||
dbContext.Database.Migrate();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return 1;
|
||||
|
||||
},cancellationToken);
|
||||
}).ToArray();
|
||||
await TaskHelper.WhenAllFastFail(migrateTasks);
|
||||
}
|
||||
|
||||
private static DbContextOptions CreateDbContextOptions(IShardingRuntimeContext shardingRuntimeContext,Type dbContextType,string dataSourceName)
|
||||
{
|
||||
var virtualDataSource = shardingRuntimeContext.GetVirtualDataSource();
|
||||
var shardingConfigOptions = shardingRuntimeContext.GetShardingConfigOptions();
|
||||
var dbContextOptionBuilder = DataSourceDbContext.CreateDbContextOptionBuilder(dbContextType);
|
||||
var connectionString = virtualDataSource.GetConnectionString(dataSourceName);
|
||||
virtualDataSource.UseDbContextOptionsBuilder(connectionString, dbContextOptionBuilder);
|
||||
shardingConfigOptions.ShardingMigrationConfigure?.Invoke(dbContextOptionBuilder);
|
||||
//迁移
|
||||
dbContextOptionBuilder.UseShardingOptions(shardingRuntimeContext);
|
||||
return dbContextOptionBuilder.Options;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 动态添加读写分离链接字符串
|
||||
/// </summary>
|
||||
|
|
Loading…
Reference in New Issue