添加对changetracker的支持 发布x.6.0.24

This commit is contained in:
xuejiaming 2022-08-02 17:25:06 +08:00
parent 9593b70743
commit 8a998f7d52
17 changed files with 360 additions and 77 deletions

View File

@ -14,6 +14,7 @@ using Microsoft.EntityFrameworkCore.Migrations;
using Sample.Migrations.EFCores;
using ShardingCore;
using ShardingCore.Bootstrappers;
using ShardingCore.Core.RuntimeContexts;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions;
namespace Sample.Migrations
@ -30,7 +31,6 @@ namespace Sample.Migrations
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
//services.AddDbContext<DefaultShardingTableDbContext>((sp, builder) =>

View File

@ -46,7 +46,7 @@ namespace Sample.MySql.Controllers
// Console.WriteLine("------------");
// using (var tran = _defaultTableDbContext.Database.BeginTransaction())
// {
var sysUserMods = _defaultTableDbContext.Set<SysUserMod>();
var resultX = await _defaultTableDbContext.Set<SysUserMod>()
.Where(o => o.Id == "2" || o.Id == "3").FirstOrDefaultAsync();
var resultY = await _defaultTableDbContext.Set<SysUserMod>().FirstOrDefaultAsync(o => o.Id == "2" || o.Id == "3");

View File

@ -91,7 +91,7 @@ namespace Sample.MySql
});
o.AddDefaultDataSource("ds0",
"server=127.0.0.1;port=3306;database=dbdbdx;userid=root;password=root;");
}).AddShardingCore();
}).ReplaceService<ITableEnsureManager, MySqlTableEnsureManager>().AddShardingCore();
services.AddSingleton<IShardingRuntimeContext>(sp =>
{
Stopwatch stopwatch = Stopwatch.StartNew();
@ -196,21 +196,21 @@ namespace Sample.MySql
// var shardingRuntimeContext = app.ApplicationServices.GetRequiredService<IShardingRuntimeContext>();
// var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();
// var entityMetadata = entityMetadataManager.TryGet<SysUserMod>();
// using (var scope = app.ApplicationServices.CreateScope())
// {
// var defaultShardingDbContext = scope.ServiceProvider.GetService<DefaultShardingDbContext>();
// // if (defaultShardingDbContext.Database.GetPendingMigrations().Any())
// {
// try
// {
//
// defaultShardingDbContext.Database.Migrate();
// }
// catch (Exception e)
// {
// }
// }
// }
using (var scope = app.ApplicationServices.CreateScope())
{
var defaultShardingDbContext = scope.ServiceProvider.GetService<DefaultShardingDbContext>();
// if (defaultShardingDbContext.Database.GetPendingMigrations().Any())
{
try
{
defaultShardingDbContext.Database.Migrate();
}
catch (Exception e)
{
}
}
}
// using (var scope = app.ApplicationServices.CreateScope())
// {
// var defaultShardingDbContext = scope.ServiceProvider.GetService<OtherDbContext>();

View File

@ -10,6 +10,8 @@ using ShardingCore;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using ShardingCore.Core.RuntimeContexts;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.ReadWriteConfigurations;
@ -32,7 +34,6 @@ namespace Sample.SqlServer
{
services.AddControllers();
//services.AddDbContext<DefaultTableDbContext>(o => o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDBxx3;Integrated Security=True"));
services.AddShardingDbContext<DefaultShardingDbContext>()
.UseRouteConfig(o =>
{

View File

@ -1,9 +1,10 @@
namespace ShardingCore.Bootstrappers
{
/// <summary>
/// 分片初始化器主要用来初始化元数据信息和平行表
/// </summary>
internal interface IShardingInitializer
{
/// <summary>
/// 初始化
/// </summary>

View File

@ -5,12 +5,9 @@ using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
namespace ShardingCore.Core.DbContextCreator
{
/*
* @Author: xjm
* @Description:
* @Date: Wednesday, 16 December 2020 16:15:43
* @Email: 326308290@qq.com
*/
/// <summary>
/// 用来实现dbcontext的创建,将RouteTail和DbContextOptions封装到一起
/// </summary>
public class ShardingDbContextOptions
{
public ShardingDbContextOptions(DbContextOptions dbContextOptions, IRouteTail routeTail)
@ -19,7 +16,13 @@ namespace ShardingCore.Core.DbContextCreator
DbContextOptions = dbContextOptions;
}
/// <summary>
/// 用来告诉ShardingCore创建的DbContext是什么后缀
/// </summary>
public IRouteTail RouteTail{ get; }
/// <summary>
/// 用来创建DbContext
/// </summary>
public DbContextOptions DbContextOptions { get; }
}
}

View File

@ -0,0 +1,58 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
namespace ShardingCore.Core.Internal
{
public class MultiEnumerator<TEntity>:IEnumerator<TEntity>
{
private readonly List<IEnumerator<TEntity>> _enumerators;
private int index;
private readonly int _enumeratorsCount;
public MultiEnumerator(IEnumerable<IEnumerator<TEntity>> enumerators)
{
_enumerators = enumerators.ToList();
_enumeratorsCount = _enumerators.Count;
index = 0;
}
public bool MoveNext()
{
if (_enumeratorsCount == 0)
{
return false;
}
if (index >= _enumeratorsCount)
{
return false;
}
while (index < _enumeratorsCount)
{
var moveNext = _enumerators[index].MoveNext();
if (moveNext)
{
return true;
}
index++;
}
return false;
}
public void Reset()
{
throw new System.NotImplementedException();
}
public TEntity Current => _enumerators[index].Current;
object IEnumerator.Current => Current;
public void Dispose()
{
_enumerators.Clear();
}
}
}

View File

@ -7,6 +7,9 @@ using ShardingCore.Core.ServiceProviders;
namespace ShardingCore.Core.ShardingConfigurations
{
/// <summary>
/// 分片配置
/// </summary>
public class ShardingConfigOptions
{
/// <summary>

View File

@ -0,0 +1,148 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.ChangeTracking;
using Microsoft.EntityFrameworkCore.ChangeTracking.Internal;
using Microsoft.EntityFrameworkCore.Metadata;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.EFCores.ChangeTrackers
{
public class ShardingChangeTracker : ChangeTracker
{
private readonly DbContext _dbContext;
public ShardingChangeTracker(DbContext context, IStateManager stateManager, IChangeDetector changeDetector,
IModel model, IEntityEntryGraphIterator graphIterator) : base(context, stateManager, changeDetector, model,
graphIterator)
{
_dbContext = context;
}
#if !EFCORE2 && !EFCORE3 && !EFCORE5 && !EFCORE6
1
#endif
public override bool HasChanges()
{
if (_dbContext is ICurrentDbContextDiscover currentDbContextDiscover)
{
return currentDbContextDiscover.GetCurrentDbContexts().Any(o =>
o.Value.GetCurrentContexts().Any(r => r.Value.ChangeTracker.HasChanges()));
}
return base.HasChanges();
}
public override IEnumerable<EntityEntry> Entries()
{
if (_dbContext is ICurrentDbContextDiscover currentDbContextDiscover)
{
return currentDbContextDiscover.GetCurrentDbContexts().SelectMany(o =>
o.Value.GetCurrentContexts().SelectMany(cd => cd.Value.ChangeTracker.Entries()));
}
return base.Entries();
}
public override IEnumerable<EntityEntry<TEntity>> Entries<TEntity>()
{
if (_dbContext is ICurrentDbContextDiscover currentDbContextDiscover)
{
return currentDbContextDiscover.GetCurrentDbContexts().SelectMany(o =>
o.Value.GetCurrentContexts().SelectMany(cd => cd.Value.ChangeTracker.Entries<TEntity>()));
}
return base.Entries<TEntity>();
}
public override void DetectChanges()
{
if (_dbContext is ICurrentDbContextDiscover)
{
Do(c => c.DetectChanges());
return;
}
base.DetectChanges();
}
public override void AcceptAllChanges()
{
if (_dbContext is ICurrentDbContextDiscover)
{
Do(c => c.AcceptAllChanges());
return;
}
base.AcceptAllChanges();
}
private void Do(Action<ChangeTracker> action)
{
var dataSourceDbContexts = ((ICurrentDbContextDiscover)_dbContext).GetCurrentDbContexts();
foreach (var dataSourceDbContext in dataSourceDbContexts)
{
var currentContexts = dataSourceDbContext.Value.GetCurrentContexts();
foreach (var keyValuePair in currentContexts)
{
action(keyValuePair.Value.ChangeTracker);
}
}
}
public override void TrackGraph(object rootEntity, Action<EntityEntryGraphNode> callback)
{
if (_dbContext is ICurrentDbContextDiscover)
{
Do(c => c.TrackGraph(rootEntity,callback));
return;
}
base.TrackGraph(rootEntity, callback);
}
#if !EFCORE2
public override void TrackGraph<TState>(object rootEntity, TState state, Func<EntityEntryGraphNode<TState>, bool> callback) where TState : default
{
if (_dbContext is ICurrentDbContextDiscover)
{
Do(c => c.TrackGraph(rootEntity,state,callback));
return;
}
base.TrackGraph(rootEntity, state, callback);
}
public override void CascadeChanges()
{
if (_dbContext is ICurrentDbContextDiscover)
{
Do(c => c.CascadeChanges());
return;
}
base.CascadeChanges();
}
#endif
#if !EFCORE2 && !EFCORE3
public override void Clear()
{
if (_dbContext is ICurrentDbContextDiscover)
{
Do(c => c.Clear());
return;
}
base.Clear();
}
#endif
#if EFCORE2
public override void TrackGraph<TState>(object rootEntity, TState state, Func<EntityEntryGraphNode, TState, bool> callback)
{
if (_dbContext is ICurrentDbContextDiscover)
{
Do(c => c.TrackGraph(rootEntity,state,callback));
return;
}
base.TrackGraph(rootEntity, state, callback);
}
#endif
}
}

View File

@ -0,0 +1,33 @@
using Microsoft.EntityFrameworkCore.ChangeTracking;
using Microsoft.EntityFrameworkCore.ChangeTracking.Internal;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.EntityFrameworkCore.Metadata;
namespace ShardingCore.EFCores.ChangeTrackers
{
public class ShardingChangeTrackerFactory:ChangeTrackerFactory
{
private readonly ICurrentDbContext _currentContext;
private readonly IStateManager _stateManager;
private readonly IChangeDetector _changeDetector;
private readonly IModel _model;
private readonly IEntityEntryGraphIterator _graphIterator;
public ShardingChangeTrackerFactory(ICurrentDbContext currentContext, IStateManager stateManager, IChangeDetector changeDetector, IModel model, IEntityEntryGraphIterator graphIterator) : base(currentContext, stateManager, changeDetector, model, graphIterator)
{
_currentContext = currentContext;
_stateManager = stateManager;
_changeDetector = changeDetector;
_model = model;
_graphIterator = graphIterator;
}
public ChangeTracker Create()
{
return new ShardingChangeTracker(_currentContext.Context, _stateManager, _changeDetector, _model,
_graphIterator);
}
}
}

View File

@ -2,7 +2,9 @@ using Microsoft.EntityFrameworkCore;
namespace ShardingCore.EFCores
{
/// <summary>
/// 迁移执行单元
/// </summary>
public class MigrateUnit
{
public MigrateUnit(DbContext shellDbContext, string dataSourceName)
@ -10,8 +12,13 @@ namespace ShardingCore.EFCores
ShellDbContext = shellDbContext;
DataSourceName = dataSourceName;
}
/// <summary>
/// 壳dbcontext
/// </summary>
public DbContext ShellDbContext { get; }
/// <summary>
/// 数据源名称
/// </summary>
public string DataSourceName { get; }
}
}

View File

@ -1,41 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.ChangeTracking;
using Microsoft.EntityFrameworkCore.ChangeTracking.Internal;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.EFCores
{
public class ShardingChangeTracker: ChangeTracker
{
private readonly ICurrentDbContextDiscover _contextDiscover;
public ShardingChangeTracker(DbContext context, IStateManager stateManager, IChangeDetector changeDetector, IModel model, IEntityEntryGraphIterator graphIterator) : base(context, stateManager, changeDetector, model, graphIterator)
{
_contextDiscover = context as ICurrentDbContextDiscover?? throw new ShardingCoreNotSupportException($"{context.GetType()} not impl {nameof(ICurrentDbContextDiscover)}");
}
public override bool HasChanges()
{
return _contextDiscover.GetCurrentDbContexts().Any(o =>
o.Value.GetCurrentContexts().Any(r => r.Value.ChangeTracker.HasChanges()));
}
public override IEnumerable<EntityEntry> Entries()
{
return _contextDiscover.GetCurrentDbContexts().SelectMany(o => o.Value.GetCurrentContexts().SelectMany(cd=>cd.Value.ChangeTracker.Entries()));
}
public override IEnumerable<EntityEntry<TEntity>> Entries<TEntity>()
{
return _contextDiscover.GetCurrentDbContexts().SelectMany(o => o.Value.GetCurrentContexts().SelectMany(cd=>cd.Value.ChangeTracker.Entries<TEntity>()));
}
}
}

View File

@ -31,6 +31,7 @@ namespace ShardingCore.EFCores
{
private readonly IShardingDbContext _context;
private readonly IShardingRuntimeContext _shardingRuntimeContext;
private LocalView<TEntity>? _localView;
#if EFCORE5 || EFCORE6
@ -63,6 +64,19 @@ namespace ShardingCore.EFCores
}
}
public override LocalView<TEntity> Local
{
get
{
if (((DbContext)_context).ChangeTracker.AutoDetectChangesEnabled)
{
((DbContext)_context).ChangeTracker.DetectChanges();
}
return _localView ??= new ShardingLocalView<TEntity>(this);
}
}
private ITableRouteManager _tableRouteManager;
protected ITableRouteManager TableRouteManager

View File

@ -0,0 +1,33 @@
using System.Collections.Generic;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.ChangeTracking;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Internal;
using ShardingCore.Core.Internal;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.EFCores
{
public class ShardingLocalView<TEntity>:LocalView<TEntity> where TEntity : class
{
private readonly DbContext _dbContext;
public ShardingLocalView(DbSet<TEntity> set) : base(set)
{
_dbContext = set.GetService<ICurrentDbContext>().Context;
}
public override IEnumerator<TEntity> GetEnumerator()
{
if (_dbContext is ICurrentDbContextDiscover currentDbContextDiscover)
{
var dataSourceDbContexts = currentDbContextDiscover.GetCurrentDbContexts();
var enumerators = dataSourceDbContexts.SelectMany(o => o.Value.GetCurrentContexts().Select(cd=>cd.Value.Set<TEntity>().Local.GetEnumerator()));
return new MultiEnumerator<TEntity>(enumerators);
}
return base.GetEnumerator();
}
}
}

View File

@ -4,11 +4,30 @@ using Microsoft.Extensions.Logging;
namespace ShardingCore.Jobs.Abstaractions
{
/// <summary>
/// 任务接口对应的路由实现后可以加入到默认的内存队列中定时执行
/// </summary>
public interface IJob
{
/// <summary>
/// 任务名称
/// </summary>
string JobName { get; }
/// <summary>
/// 执行的周期
/// </summary>
/// <returns></returns>
string[] GetJobCronExpressions();
/// <summary>
/// 如何执行任务
/// </summary>
/// <returns></returns>
Task ExecuteAsync();
/// <summary>
/// 任务是否需要添加到默认的任务里面
/// 当然也可以自行处理
/// </summary>
/// <returns></returns>
bool AppendJob();
}
}

View File

@ -25,7 +25,7 @@ namespace ShardingCore.Sharding
/// <summary>
/// 分表分库的dbcontext
/// </summary>
public abstract class AbstractShardingDbContext : DbContext, IShardingDbContext, ISupportShardingReadWrite//,ICurrentDbContextDiscover
public abstract class AbstractShardingDbContext : DbContext, IShardingDbContext, ISupportShardingReadWrite,ICurrentDbContextDiscover
{
protected IShardingDbContextExecutor ShardingDbContextExecutor { get; }
@ -501,10 +501,10 @@ namespace ShardingCore.Sharding
ShardingDbContextExecutor.Commit();
}
//public IDictionary<string, IDataSourceDbContext> GetCurrentDbContexts()
//{
// return ShardingDbContextExecutor.GetCurrentDbContexts();
//}
public IDictionary<string, IDataSourceDbContext> GetCurrentDbContexts()
{
return ShardingDbContextExecutor.GetCurrentDbContexts();
}
}
}

View File

@ -27,6 +27,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.ChangeTracking.Internal;
using Microsoft.EntityFrameworkCore.Migrations;
using ShardingCore.Bootstrappers;
using ShardingCore.Core.DbContextCreator;
@ -42,6 +43,7 @@ using ShardingCore.Core.VirtualRoutes.Abstractions;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.DynamicDataSources;
using ShardingCore.EFCores.ChangeTrackers;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.MergeContexts;
@ -143,7 +145,7 @@ namespace ShardingCore
dbContextOptionsBuilder.UseDefaultSharding<TShardingDbContext>(shardingRuntimeContext);
}
public static void UseDefaultSharding<TShardingDbContext>(this DbContextOptionsBuilder dbContextOptionsBuilder,
public static DbContextOptionsBuilder UseDefaultSharding<TShardingDbContext>(this DbContextOptionsBuilder dbContextOptionsBuilder,
IShardingRuntimeContext shardingRuntimeContext) where TShardingDbContext : DbContext, IShardingDbContext
{
var shardingConfigOptions = shardingRuntimeContext.GetShardingConfigOptions();
@ -156,6 +158,7 @@ namespace ShardingCore
.UseSharding<TShardingDbContext>(shardingRuntimeContext);
virtualDataSource.ConfigurationParams.UseShellDbContextOptionBuilder(contextOptionsBuilder);
return dbContextOptionsBuilder;
}
internal static IServiceCollection AddInternalShardingCore<TShardingDbContext>(this IServiceCollection services)
@ -240,6 +243,7 @@ namespace ShardingCore
return optionsBuilder.UseShardingWrapMark().UseShardingOptions(shardingRuntimeContext)
.ReplaceService<IDbSetSource, ShardingDbSetSource>()
.ReplaceService<IQueryCompiler, ShardingQueryCompiler>()
.ReplaceService<IChangeTrackerFactory, ShardingChangeTrackerFactory>()
.ReplaceService<IDbContextTransactionManager,
ShardingRelationalTransactionManager<TShardingDbContext>>()
.ReplaceService<IRelationalTransactionFactory,