添加按字段分片自动动态
This commit is contained in:
parent
ab712361e7
commit
83bbe85652
|
@ -0,0 +1,9 @@
|
||||||
|
namespace Sample.AutoCreateIfPresent
|
||||||
|
{
|
||||||
|
public class AreaDevice
|
||||||
|
{
|
||||||
|
public string Id { get; set; }
|
||||||
|
public DateTime CreateTime { get; set; }
|
||||||
|
public string Area { get; set; }
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,16 @@
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.EntityFrameworkCore.Metadata.Builders;
|
||||||
|
|
||||||
|
namespace Sample.AutoCreateIfPresent
|
||||||
|
{
|
||||||
|
public class AreaDeviceMap:IEntityTypeConfiguration<AreaDevice>
|
||||||
|
{
|
||||||
|
public void Configure(EntityTypeBuilder<AreaDevice> builder)
|
||||||
|
{
|
||||||
|
builder.HasKey(o => o.Id);
|
||||||
|
builder.Property(o => o.Id).IsRequired().HasMaxLength(50);
|
||||||
|
builder.Property(o => o.Area).IsRequired().HasMaxLength(128);
|
||||||
|
builder.ToTable(nameof(AreaDevice));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,146 @@
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Linq.Expressions;
|
||||||
|
using MySqlConnector;
|
||||||
|
using ShardingCore.Core.EntityMetadatas;
|
||||||
|
using ShardingCore.Core.PhysicTables;
|
||||||
|
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions;
|
||||||
|
using ShardingCore.Core.VirtualDatabase.VirtualTables;
|
||||||
|
using ShardingCore.Core.VirtualRoutes;
|
||||||
|
using ShardingCore.Core.VirtualRoutes.TableRoutes.Abstractions;
|
||||||
|
using ShardingCore.Exceptions;
|
||||||
|
using ShardingCore.Extensions;
|
||||||
|
using ShardingCore.TableCreator;
|
||||||
|
|
||||||
|
namespace Sample.AutoCreateIfPresent
|
||||||
|
{
|
||||||
|
public class AreaDeviceRoute : AbstractShardingOperatorVirtualTableRoute<AreaDevice, string>
|
||||||
|
{
|
||||||
|
private const string Tables = "Tables";
|
||||||
|
private const string TABLE_SCHEMA = "TABLE_SCHEMA";
|
||||||
|
private const string TABLE_NAME = "TABLE_NAME";
|
||||||
|
|
||||||
|
private const string CurrentTableName = nameof(AreaDevice);
|
||||||
|
private readonly IVirtualDataSourceManager<DefaultDbContext> _virtualDataSourceManager;
|
||||||
|
private readonly IVirtualTableManager<DefaultDbContext> _virtualTableManager;
|
||||||
|
private readonly IShardingTableCreator<DefaultDbContext> _shardingTableCreator;
|
||||||
|
private readonly ConcurrentDictionary<string, object?> _tails = new ConcurrentDictionary<string, object?>();
|
||||||
|
private readonly object _lock = new object();
|
||||||
|
|
||||||
|
public AreaDeviceRoute(IVirtualDataSourceManager<DefaultDbContext> virtualDataSourceManager, IVirtualTableManager<DefaultDbContext> virtualTableManager, IShardingTableCreator<DefaultDbContext> shardingTableCreator)
|
||||||
|
{
|
||||||
|
_virtualDataSourceManager = virtualDataSourceManager;
|
||||||
|
_virtualTableManager = virtualTableManager;
|
||||||
|
_shardingTableCreator = shardingTableCreator;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public override string ShardingKeyToTail(object shardingKey)
|
||||||
|
{
|
||||||
|
return $"{shardingKey}";
|
||||||
|
}
|
||||||
|
|
||||||
|
public override List<string> GetAllTails()
|
||||||
|
{
|
||||||
|
//启动寻找有哪些表后缀
|
||||||
|
using (var connection = new MySqlConnection(_virtualDataSourceManager.GetCurrentVirtualDataSource().DefaultConnectionString))
|
||||||
|
{
|
||||||
|
connection.Open();
|
||||||
|
var database = connection.Database;
|
||||||
|
|
||||||
|
using (var dataTable = connection.GetSchema(Tables))
|
||||||
|
{
|
||||||
|
for (int i = 0; i < dataTable.Rows.Count; i++)
|
||||||
|
{
|
||||||
|
var schema = dataTable.Rows[i][TABLE_SCHEMA];
|
||||||
|
if (database.Equals($"{schema}", StringComparison.OrdinalIgnoreCase))
|
||||||
|
{
|
||||||
|
var tableName = dataTable.Rows[i][TABLE_NAME]?.ToString() ?? string.Empty;
|
||||||
|
if (tableName.StartsWith(CurrentTableName,StringComparison.OrdinalIgnoreCase))
|
||||||
|
{
|
||||||
|
//如果没有下划线那么需要CurrentTableName.Length-1
|
||||||
|
_tails.TryAdd(tableName.Substring(CurrentTableName.Length), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return _tails.Keys.ToList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public override void Configure(EntityMetadataTableBuilder<AreaDevice> builder)
|
||||||
|
{
|
||||||
|
builder.ShardingProperty(o => o.Area);
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Expression<Func<string, bool>> GetRouteToFilter(string shardingKey, ShardingOperatorEnum shardingOperator)
|
||||||
|
{
|
||||||
|
var t = ShardingKeyToTail(shardingKey);
|
||||||
|
switch (shardingOperator)
|
||||||
|
{
|
||||||
|
case ShardingOperatorEnum.Equal: return tail => tail == t;
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
#if DEBUG
|
||||||
|
Console.WriteLine($"shardingOperator is not equal scan all table tail");
|
||||||
|
#endif
|
||||||
|
return tail => true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public override IPhysicTable RouteWithValue(List<IPhysicTable> allPhysicTables, object shardingKey)
|
||||||
|
{
|
||||||
|
var shardingKeyToTail = ShardingKeyToTail(shardingKey);
|
||||||
|
|
||||||
|
if (!_tails.TryGetValue(shardingKeyToTail, out var _))
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
if (!_tails.TryGetValue(shardingKeyToTail, out var _))
|
||||||
|
{
|
||||||
|
var virtualTable = _virtualTableManager.GetVirtualTable(typeof(AreaDevice));
|
||||||
|
_virtualTableManager.AddPhysicTable(virtualTable, new DefaultPhysicTable(virtualTable, shardingKeyToTail));
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_shardingTableCreator.CreateTable<AreaDevice>(_virtualDataSourceManager.GetCurrentVirtualDataSource().DefaultDataSourceName, shardingKeyToTail);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Console.WriteLine("尝试添加表失败" + ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
_tails.TryAdd(shardingKeyToTail, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var needRefresh = allPhysicTables.Count != _tails.Count;
|
||||||
|
if (needRefresh)
|
||||||
|
{
|
||||||
|
var virtualTable = _virtualTableManager.GetVirtualTable(typeof(AreaDevice));
|
||||||
|
foreach (var tail in _tails.Keys)
|
||||||
|
{
|
||||||
|
var hashSet = allPhysicTables.Select(o => o.Tail).ToHashSet();
|
||||||
|
if (!hashSet.Contains(tail))
|
||||||
|
{
|
||||||
|
var tables = virtualTable.GetAllPhysicTables();
|
||||||
|
var physicTable = tables.FirstOrDefault(o => o.Tail == tail);
|
||||||
|
if (physicTable != null)
|
||||||
|
{
|
||||||
|
allPhysicTables.Add(physicTable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var physicTables = allPhysicTables.Where(o => o.Tail == shardingKeyToTail).ToList();
|
||||||
|
if (physicTables.IsEmpty())
|
||||||
|
{
|
||||||
|
throw new ShardingCoreException($"sharding key route not match {EntityMetadata.EntityType} -> [{EntityMetadata.ShardingTableProperty.Name}] ->【{shardingKey}】 all tails ->[{string.Join(",", allPhysicTables.Select(o => o.FullName))}]");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (physicTables.Count > 1)
|
||||||
|
throw new ShardingCoreException($"more than one route match table:{string.Join(",", physicTables.Select(o => $"[{o.FullName}]"))}");
|
||||||
|
return physicTables[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -49,4 +49,21 @@ public class WeatherForecastController : ControllerBase
|
||||||
await _defaultDbContext.SaveChangesAsync();
|
await _defaultDbContext.SaveChangesAsync();
|
||||||
return Ok();
|
return Ok();
|
||||||
}
|
}
|
||||||
|
public async Task<IActionResult> Query1()
|
||||||
|
{
|
||||||
|
var list = await _defaultDbContext.Set<AreaDevice>().ToListAsync();
|
||||||
|
return Ok(list);
|
||||||
|
}
|
||||||
|
public async Task<IActionResult> Insert1()
|
||||||
|
{
|
||||||
|
var list = new List<string>(){"A","B","C","D","E", "F", "G" };
|
||||||
|
var orderByHour = new AreaDevice();
|
||||||
|
orderByHour.Id = Guid.NewGuid().ToString("n");
|
||||||
|
orderByHour.Area = list[new Random().Next(0, list.Count)];
|
||||||
|
var dateTime = DateTime.Now;
|
||||||
|
orderByHour.CreateTime = dateTime.AddHours(new Random().Next(1, 20));
|
||||||
|
await _defaultDbContext.AddAsync(orderByHour);
|
||||||
|
await _defaultDbContext.SaveChangesAsync();
|
||||||
|
return Ok();
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -22,6 +22,7 @@ namespace Sample.AutoCreateIfPresent
|
||||||
{
|
{
|
||||||
base.OnModelCreating(modelBuilder);
|
base.OnModelCreating(modelBuilder);
|
||||||
modelBuilder.ApplyConfiguration(new OrderByHourMap());
|
modelBuilder.ApplyConfiguration(new OrderByHourMap());
|
||||||
|
modelBuilder.ApplyConfiguration(new AreaDeviceMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
public IRouteTail RouteTail { get; set; }
|
public IRouteTail RouteTail { get; set; }
|
||||||
|
|
|
@ -28,6 +28,8 @@ namespace Sample.AutoCreateIfPresent
|
||||||
private const string Tables = "Tables";
|
private const string Tables = "Tables";
|
||||||
private const string TABLE_SCHEMA = "TABLE_SCHEMA";
|
private const string TABLE_SCHEMA = "TABLE_SCHEMA";
|
||||||
private const string TABLE_NAME = "TABLE_NAME";
|
private const string TABLE_NAME = "TABLE_NAME";
|
||||||
|
|
||||||
|
private const string CurrentTableName = nameof(OrderByHour);
|
||||||
private readonly IVirtualDataSourceManager<DefaultDbContext> _virtualDataSourceManager;
|
private readonly IVirtualDataSourceManager<DefaultDbContext> _virtualDataSourceManager;
|
||||||
private readonly IVirtualTableManager<DefaultDbContext> _virtualTableManager;
|
private readonly IVirtualTableManager<DefaultDbContext> _virtualTableManager;
|
||||||
private readonly IShardingTableCreator<DefaultDbContext> _shardingTableCreator;
|
private readonly IShardingTableCreator<DefaultDbContext> _shardingTableCreator;
|
||||||
|
@ -70,9 +72,10 @@ namespace Sample.AutoCreateIfPresent
|
||||||
if (database.Equals($"{schema}", StringComparison.OrdinalIgnoreCase))
|
if (database.Equals($"{schema}", StringComparison.OrdinalIgnoreCase))
|
||||||
{
|
{
|
||||||
var tableName = dataTable.Rows[i][TABLE_NAME]?.ToString()??string.Empty;
|
var tableName = dataTable.Rows[i][TABLE_NAME]?.ToString()??string.Empty;
|
||||||
if (tableName.StartsWith(nameof(OrderByHour)))
|
if (tableName.StartsWith(CurrentTableName, StringComparison.OrdinalIgnoreCase))
|
||||||
{
|
{
|
||||||
_tails.TryAdd(tableName.Replace($"{nameof(OrderByHour)}_",""),null);
|
//如果没有下划线那么需要CurrentTableName.Length-1
|
||||||
|
_tails.TryAdd(tableName.Substring(CurrentTableName.Length),null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,11 +24,12 @@ builder.Services.AddShardingDbContext<DefaultDbContext>()
|
||||||
o.CreateShardingTableOnStart = true;
|
o.CreateShardingTableOnStart = true;
|
||||||
o.EnsureCreatedWithOutShardingTable = true;
|
o.EnsureCreatedWithOutShardingTable = true;
|
||||||
o.AddShardingTableRoute<OrderByHourRoute>();
|
o.AddShardingTableRoute<OrderByHourRoute>();
|
||||||
|
o.AddShardingTableRoute<AreaDeviceRoute>();
|
||||||
})
|
})
|
||||||
.AddConfig(o =>
|
.AddConfig(o =>
|
||||||
{
|
{
|
||||||
o.ConfigId = "c1";
|
o.ConfigId = "c1";
|
||||||
o.AddDefaultDataSource("ds0", "server=127.0.0.1;port=3306;database=shardingTest;userid=root;password=root;");
|
o.AddDefaultDataSource("ds0", "server=127.0.0.1;port=3306;database=shardingTest;userid=root;password=L6yBtV6qNENrwBy7;");
|
||||||
o.UseShardingQuery((conn, b) =>
|
o.UseShardingQuery((conn, b) =>
|
||||||
{
|
{
|
||||||
b.UseMySql(conn, new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger);
|
b.UseMySql(conn, new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger);
|
||||||
|
|
|
@ -91,43 +91,43 @@ namespace Sample.BulkConsole
|
||||||
}
|
}
|
||||||
|
|
||||||
var b = DateTime.Now.Date.AddDays(-3);
|
var b = DateTime.Now.Date.AddDays(-3);
|
||||||
var queryable = myShardingDbContext.Set<Order>().Where(o => o.CreateTime >= b).OrderBy(o => o.CreateTime);
|
var queryable = myShardingDbContext.Set<Order>().Select(o=>new {Id=o.Id,OrderNo=o.OrderNo, CreateTime =o.CreateTime });//.Where(o => o.CreateTime >= b);
|
||||||
|
|
||||||
var startNew1 = Stopwatch.StartNew();
|
var startNew1 = Stopwatch.StartNew();
|
||||||
|
|
||||||
|
|
||||||
startNew1.Restart();
|
//startNew1.Restart();
|
||||||
var list2 = queryable.Take(1000).ToList();
|
//var list2 = queryable.Take(1000).ToList();
|
||||||
startNew1.Stop();
|
//startNew1.Stop();
|
||||||
Console.WriteLine($"获取1000条用时:{startNew1.ElapsedMilliseconds}毫秒");
|
//Console.WriteLine($"获取1000条用时:{startNew1.ElapsedMilliseconds}毫秒");
|
||||||
|
|
||||||
startNew1.Restart();
|
//startNew1.Restart();
|
||||||
var list = queryable.Take(10).ToList();
|
//var list = queryable.Take(10).ToList();
|
||||||
startNew1.Stop();
|
//startNew1.Stop();
|
||||||
Console.WriteLine($"获取10条用时:{startNew1.ElapsedMilliseconds}毫秒");
|
//Console.WriteLine($"获取10条用时:{startNew1.ElapsedMilliseconds}毫秒");
|
||||||
|
|
||||||
|
|
||||||
startNew1.Restart();
|
//startNew1.Restart();
|
||||||
var list1 = queryable.Take(100).ToList();
|
//var list1 = queryable.Take(100).ToList();
|
||||||
startNew1.Stop();
|
//startNew1.Stop();
|
||||||
Console.WriteLine($"获取100条用时:{startNew1.ElapsedMilliseconds}毫秒");
|
//Console.WriteLine($"获取100条用时:{startNew1.ElapsedMilliseconds}毫秒");
|
||||||
|
|
||||||
|
|
||||||
startNew1.Restart();
|
//startNew1.Restart();
|
||||||
var list3 = queryable.Take(10000).ToList();
|
//var list3 = queryable.Take(10000).ToList();
|
||||||
startNew1.Stop();
|
//startNew1.Stop();
|
||||||
Console.WriteLine($"获取100000条用时:{startNew1.ElapsedMilliseconds}毫秒");
|
//Console.WriteLine($"获取100000条用时:{startNew1.ElapsedMilliseconds}毫秒");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
startNew1.Restart();
|
//startNew1.Restart();
|
||||||
var list4 = queryable.Take(20000).ToList();
|
//var list4 = queryable.Take(20000).ToList();
|
||||||
startNew1.Stop();
|
//startNew1.Stop();
|
||||||
Console.WriteLine($"获取20000条用时:{startNew1.ElapsedMilliseconds}毫秒");
|
//Console.WriteLine($"获取20000条用时:{startNew1.ElapsedMilliseconds}毫秒");
|
||||||
|
|
||||||
|
|
||||||
int skip = 0, take = 1000;
|
int skip = 0, take = 1000;
|
||||||
for (int i = 20000; i < 30000; i++)
|
for (int i = 20; i < 30000; i++)
|
||||||
{
|
{
|
||||||
skip = take * i;
|
skip = take * i;
|
||||||
startNew1.Restart();
|
startNew1.Restart();
|
||||||
|
|
|
@ -17,6 +17,8 @@ namespace ShardingCore.Sharding.Enumerators
|
||||||
private readonly IAsyncEnumerator<T> _asyncSource;
|
private readonly IAsyncEnumerator<T> _asyncSource;
|
||||||
private readonly IEnumerator<T> _syncSource;
|
private readonly IEnumerator<T> _syncSource;
|
||||||
private bool skip;
|
private bool skip;
|
||||||
|
private readonly bool _asyncEnumerator;
|
||||||
|
private readonly bool _syncEnumerator;
|
||||||
|
|
||||||
public StreamMergeAsyncEnumerator(IAsyncEnumerator<T> asyncSource)
|
public StreamMergeAsyncEnumerator(IAsyncEnumerator<T> asyncSource)
|
||||||
{
|
{
|
||||||
|
@ -24,6 +26,7 @@ namespace ShardingCore.Sharding.Enumerators
|
||||||
throw new ArgumentNullException(nameof(_syncSource));
|
throw new ArgumentNullException(nameof(_syncSource));
|
||||||
|
|
||||||
_asyncSource = asyncSource;
|
_asyncSource = asyncSource;
|
||||||
|
_asyncEnumerator = asyncSource!=null;
|
||||||
skip = true;
|
skip = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,6 +35,7 @@ namespace ShardingCore.Sharding.Enumerators
|
||||||
if (_asyncSource != null)
|
if (_asyncSource != null)
|
||||||
throw new ArgumentNullException(nameof(_asyncSource));
|
throw new ArgumentNullException(nameof(_asyncSource));
|
||||||
_syncSource = syncSource;
|
_syncSource = syncSource;
|
||||||
|
_syncEnumerator = syncSource!=null;
|
||||||
skip = true;
|
skip = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,7 +51,7 @@ namespace ShardingCore.Sharding.Enumerators
|
||||||
#if !EFCORE2
|
#if !EFCORE2
|
||||||
public async ValueTask DisposeAsync()
|
public async ValueTask DisposeAsync()
|
||||||
{
|
{
|
||||||
if (_asyncSource != null)
|
if (_asyncEnumerator)
|
||||||
await _asyncSource.DisposeAsync();
|
await _asyncSource.DisposeAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,8 +83,8 @@ namespace ShardingCore.Sharding.Enumerators
|
||||||
|
|
||||||
public bool HasElement()
|
public bool HasElement()
|
||||||
{
|
{
|
||||||
if (_asyncSource != null) return null != _asyncSource.Current;
|
if (_asyncEnumerator) return null != _asyncSource.Current;
|
||||||
if (_syncSource != null) return null != _syncSource.Current;
|
if (_syncEnumerator) return null != _syncSource.Current;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,14 +101,14 @@ namespace ShardingCore.Sharding.Enumerators
|
||||||
{
|
{
|
||||||
if (skip)
|
if (skip)
|
||||||
return default;
|
return default;
|
||||||
if (_asyncSource != null) return _asyncSource.Current;
|
if (_asyncEnumerator) return _asyncSource.Current;
|
||||||
if (_syncSource != null) return _syncSource.Current;
|
if (_syncEnumerator) return _syncSource.Current;
|
||||||
return default;
|
return default;
|
||||||
}
|
}
|
||||||
public T GetReallyCurrent()
|
public T GetReallyCurrent()
|
||||||
{
|
{
|
||||||
if (_asyncSource != null) return _asyncSource.Current;
|
if (_asyncEnumerator) return _asyncSource.Current;
|
||||||
if (_syncSource != null) return _syncSource.Current;
|
if (_syncEnumerator) return _syncSource.Current;
|
||||||
return default;
|
return default;
|
||||||
}
|
}
|
||||||
#if EFCORE2
|
#if EFCORE2
|
||||||
|
|
|
@ -69,6 +69,10 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
|
||||||
//protected abstract IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor);
|
//protected abstract IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor);
|
||||||
protected abstract IExecutor<TResult> CreateExecutor<TResult>(bool async);
|
protected abstract IExecutor<TResult> CreateExecutor<TResult>(bool async);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// sql执行的路由最小单元
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
protected virtual IEnumerable<ISqlRouteUnit> GetDefaultSqlRouteUnits()
|
protected virtual IEnumerable<ISqlRouteUnit> GetDefaultSqlRouteUnits()
|
||||||
{
|
{
|
||||||
var streamMergeContext = GetStreamMergeContext();
|
var streamMergeContext = GetStreamMergeContext();
|
||||||
|
|
|
@ -5,6 +5,7 @@ using System.Collections;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
using ShardingCore.Helpers;
|
using ShardingCore.Helpers;
|
||||||
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
|
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
|
||||||
#if EFCORE2
|
#if EFCORE2
|
||||||
|
|
|
@ -36,7 +36,11 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Abstractions
|
||||||
{
|
{
|
||||||
return _streamMergeContext;
|
return _streamMergeContext;
|
||||||
}
|
}
|
||||||
|
/// <summary>
|
||||||
|
/// 创建熔断器来中断符合查询的结果比如firstordefault只需要在顺序查询下查询到第一个
|
||||||
|
/// 如果不是顺序查询则需要所有表的第一个进行内存再次查询
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
public abstract ICircuitBreaker CreateCircuitBreaker();
|
public abstract ICircuitBreaker CreateCircuitBreaker();
|
||||||
|
|
||||||
protected void Cancel()
|
protected void Cancel()
|
||||||
|
|
|
@ -31,29 +31,59 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
||||||
public class DataSourceDbContext<TShardingDbContext> : IDataSourceDbContext where TShardingDbContext : DbContext, IShardingDbContext
|
public class DataSourceDbContext<TShardingDbContext> : IDataSourceDbContext where TShardingDbContext : DbContext, IShardingDbContext
|
||||||
{
|
{
|
||||||
private static readonly IComparer<string> _comparer = new NoShardingFirstComparer();
|
private static readonly IComparer<string> _comparer = new NoShardingFirstComparer();
|
||||||
|
/// <summary>
|
||||||
|
/// 当前是否是默认的dbcontext 也就是不分片的dbcontext
|
||||||
|
/// </summary>
|
||||||
public bool IsDefault { get; }
|
public bool IsDefault { get; }
|
||||||
|
/// <summary>
|
||||||
|
/// 当前同库有多少dbcontext了
|
||||||
|
/// </summary>
|
||||||
public int DbContextCount => _dataSourceDbContexts.Count;
|
public int DbContextCount => _dataSourceDbContexts.Count;
|
||||||
|
/// <summary>
|
||||||
|
/// dbcontext 创建接口
|
||||||
|
/// </summary>
|
||||||
private readonly IDbContextCreator<TShardingDbContext> _dbContextCreator;
|
private readonly IDbContextCreator<TShardingDbContext> _dbContextCreator;
|
||||||
|
/// <summary>
|
||||||
|
/// 实际的链接字符串管理者 用来提供查询和插入dbcontext的创建链接的获取
|
||||||
|
/// </summary>
|
||||||
private readonly ActualConnectionStringManager<TShardingDbContext> _actualConnectionStringManager;
|
private readonly ActualConnectionStringManager<TShardingDbContext> _actualConnectionStringManager;
|
||||||
|
/// <summary>
|
||||||
|
/// 当前的数据源是什么默认单数据源可以支持多数据源配置
|
||||||
|
/// </summary>
|
||||||
private readonly IVirtualDataSource<TShardingDbContext> _virtualDataSource;
|
private readonly IVirtualDataSource<TShardingDbContext> _virtualDataSource;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 数据源名称
|
/// 数据源名称
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string DataSourceName { get; }
|
public string DataSourceName { get; }
|
||||||
|
/// <summary>
|
||||||
|
/// 数据源排序默认提交将未分片的数据库最先提交
|
||||||
|
/// </summary>
|
||||||
private SortedDictionary<string, DbContext> _dataSourceDbContexts =
|
private SortedDictionary<string, DbContext> _dataSourceDbContexts =
|
||||||
new SortedDictionary<string, DbContext>(_comparer);
|
new SortedDictionary<string, DbContext>(_comparer);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
private bool _isBeginTransaction => _shardingDbContext.Database.CurrentTransaction != null;
|
/// 是否开启了事务
|
||||||
private readonly DbContext _shardingDbContext;
|
/// </summary>
|
||||||
private IDbContextTransaction _shardingContextTransaction => _shardingDbContext?.Database?.CurrentTransaction;
|
private bool _isBeginTransaction => _shardingShellDbContext.Database.CurrentTransaction != null;
|
||||||
|
/// <summary>
|
||||||
|
/// shell dbcontext最外面的壳
|
||||||
|
/// </summary>
|
||||||
|
private readonly DbContext _shardingShellDbContext;
|
||||||
|
/// <summary>
|
||||||
|
/// 数据库事务
|
||||||
|
/// </summary>
|
||||||
|
private IDbContextTransaction _shardingContextTransaction => _shardingShellDbContext?.Database?.CurrentTransaction;
|
||||||
|
|
||||||
|
|
||||||
private readonly ILogger<DataSourceDbContext<TShardingDbContext>> _logger;
|
private readonly ILogger<DataSourceDbContext<TShardingDbContext>> _logger;
|
||||||
|
/// <summary>
|
||||||
|
/// 同库下公用一个db context options
|
||||||
|
/// </summary>
|
||||||
private DbContextOptions<TShardingDbContext> _dbContextOptions;
|
private DbContextOptions<TShardingDbContext> _dbContextOptions;
|
||||||
|
/// <summary>
|
||||||
|
/// 是否触发了并发如果是的话就报错
|
||||||
|
/// </summary>
|
||||||
private OneByOneChecker oneByOne = new OneByOneChecker();
|
private OneByOneChecker oneByOne = new OneByOneChecker();
|
||||||
|
|
||||||
private IDbContextTransaction CurrentDbContextTransaction => IsDefault
|
private IDbContextTransaction CurrentDbContextTransaction => IsDefault
|
||||||
|
@ -66,26 +96,26 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="dataSourceName"></param>
|
/// <param name="dataSourceName"></param>
|
||||||
/// <param name="isDefault"></param>
|
/// <param name="isDefault"></param>
|
||||||
/// <param name="shardingDbContext"></param>
|
/// <param name="shardingShellDbContext"></param>
|
||||||
/// <param name="dbContextCreator"></param>
|
/// <param name="dbContextCreator"></param>
|
||||||
/// <param name="actualConnectionStringManager"></param>
|
/// <param name="actualConnectionStringManager"></param>
|
||||||
public DataSourceDbContext(string dataSourceName,
|
public DataSourceDbContext(string dataSourceName,
|
||||||
bool isDefault,
|
bool isDefault,
|
||||||
DbContext shardingDbContext,
|
DbContext shardingShellDbContext,
|
||||||
IDbContextCreator<TShardingDbContext> dbContextCreator,
|
IDbContextCreator<TShardingDbContext> dbContextCreator,
|
||||||
ActualConnectionStringManager<TShardingDbContext> actualConnectionStringManager)
|
ActualConnectionStringManager<TShardingDbContext> actualConnectionStringManager)
|
||||||
{
|
{
|
||||||
DataSourceName = dataSourceName;
|
DataSourceName = dataSourceName;
|
||||||
IsDefault = isDefault;
|
IsDefault = isDefault;
|
||||||
_shardingDbContext = shardingDbContext;
|
_shardingShellDbContext = shardingShellDbContext;
|
||||||
_virtualDataSource = (IVirtualDataSource<TShardingDbContext>)((IShardingDbContext)shardingDbContext).GetVirtualDataSource();
|
_virtualDataSource = (IVirtualDataSource<TShardingDbContext>)((IShardingDbContext)shardingShellDbContext).GetVirtualDataSource();
|
||||||
_dbContextCreator = dbContextCreator;
|
_dbContextCreator = dbContextCreator;
|
||||||
_actualConnectionStringManager = actualConnectionStringManager;
|
_actualConnectionStringManager = actualConnectionStringManager;
|
||||||
_logger = ShardingContainer.GetService<ILogger<DataSourceDbContext<TShardingDbContext>>>();
|
_logger = ShardingContainer.GetService<ILogger<DataSourceDbContext<TShardingDbContext>>>();
|
||||||
|
|
||||||
}
|
}
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 不支持并发后期发现直接报错而不是用lock
|
/// 创建共享的数据源配置用来做事务 不支持并发后期发现直接报错
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private DbContextOptions<TShardingDbContext> CreateShareDbContextOptionsBuilder()
|
private DbContextOptions<TShardingDbContext> CreateShareDbContextOptionsBuilder()
|
||||||
|
@ -94,7 +124,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
||||||
{
|
{
|
||||||
return _dbContextOptions;
|
return _dbContextOptions;
|
||||||
}
|
}
|
||||||
|
//是否触发并发了
|
||||||
var acquired = oneByOne.Start();
|
var acquired = oneByOne.Start();
|
||||||
if (!acquired)
|
if (!acquired)
|
||||||
{
|
{
|
||||||
|
@ -102,16 +132,19 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
||||||
}
|
}
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
//先创建dbcontext option builder
|
||||||
var dbContextOptionsBuilder = CreateDbContextOptionBuilder();
|
var dbContextOptionsBuilder = CreateDbContextOptionBuilder();
|
||||||
|
|
||||||
if (IsDefault)
|
if (IsDefault)
|
||||||
{
|
{
|
||||||
var dbConnection = _shardingDbContext.Database.GetDbConnection();
|
//如果是默认的需要使用shell的dbconnection为了保证可以使用事务
|
||||||
|
var dbConnection = _shardingShellDbContext.Database.GetDbConnection();
|
||||||
_virtualDataSource.UseDbContextOptionsBuilder(dbConnection,
|
_virtualDataSource.UseDbContextOptionsBuilder(dbConnection,
|
||||||
dbContextOptionsBuilder);
|
dbContextOptionsBuilder);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
//不同数据库下的链接需要自行获取 如果当前没有dbcontext那么就是第一个,应该用链接字符串创建后续的用dbconnection创建
|
||||||
if (_dataSourceDbContexts.IsEmpty())
|
if (_dataSourceDbContexts.IsEmpty())
|
||||||
{
|
{
|
||||||
var connectionString =
|
var connectionString =
|
||||||
|
@ -160,7 +193,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
||||||
var cacheKey = routeTail.GetRouteTailIdentity();
|
var cacheKey = routeTail.GetRouteTailIdentity();
|
||||||
if (!_dataSourceDbContexts.TryGetValue(cacheKey, out var dbContext))
|
if (!_dataSourceDbContexts.TryGetValue(cacheKey, out var dbContext))
|
||||||
{
|
{
|
||||||
dbContext = _dbContextCreator.CreateDbContext(_shardingDbContext,CreateShareDbContextOptionsBuilder(), routeTail);
|
dbContext = _dbContextCreator.CreateDbContext(_shardingShellDbContext,CreateShareDbContextOptionsBuilder(), routeTail);
|
||||||
_dataSourceDbContexts.Add(cacheKey, dbContext);
|
_dataSourceDbContexts.Add(cacheKey, dbContext);
|
||||||
ShardingDbTransaction();
|
ShardingDbTransaction();
|
||||||
}
|
}
|
||||||
|
@ -213,7 +246,9 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// <summary>
|
||||||
|
/// 通知事务自动管理是否要清理还是开启还是加入事务
|
||||||
|
/// </summary>
|
||||||
public void NotifyTransaction()
|
public void NotifyTransaction()
|
||||||
{
|
{
|
||||||
if (!_isBeginTransaction)
|
if (!_isBeginTransaction)
|
||||||
|
@ -226,6 +261,9 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
||||||
JoinCurrentTransaction();
|
JoinCurrentTransaction();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// <summary>
|
||||||
|
/// 清理事务
|
||||||
|
/// </summary>
|
||||||
private void ClearTransaction()
|
private void ClearTransaction()
|
||||||
{
|
{
|
||||||
foreach (var dataSourceDbContext in _dataSourceDbContexts)
|
foreach (var dataSourceDbContext in _dataSourceDbContexts)
|
||||||
|
@ -250,6 +288,12 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
||||||
|
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
|
/// <summary>
|
||||||
|
/// 异步提交
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="acceptAllChangesOnSuccess"></param>
|
||||||
|
/// <param name="cancellationToken"></param>
|
||||||
|
/// <returns></returns>
|
||||||
public async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
|
public async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
|
||||||
{
|
{
|
||||||
|
|
||||||
|
@ -261,12 +305,17 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
||||||
|
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
|
/// <summary>
|
||||||
|
/// 获取当前的后缀数据库字典数据
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
public IDictionary<string, DbContext> GetCurrentContexts()
|
public IDictionary<string, DbContext> GetCurrentContexts()
|
||||||
{
|
{
|
||||||
return _dataSourceDbContexts;
|
return _dataSourceDbContexts;
|
||||||
}
|
}
|
||||||
|
/// <summary>
|
||||||
|
/// 回滚数据
|
||||||
|
/// </summary>
|
||||||
public void Rollback()
|
public void Rollback()
|
||||||
{
|
{
|
||||||
if (IsDefault)
|
if (IsDefault)
|
||||||
|
@ -280,7 +329,10 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
||||||
_logger.LogError(e, "rollback error.");
|
_logger.LogError(e, "rollback error.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// <summary>
|
||||||
|
/// 提交数据
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="dataSourceCount">如果只有一个数据源那么就直接报错否则就忽略</param>
|
||||||
public void Commit(int dataSourceCount)
|
public void Commit(int dataSourceCount)
|
||||||
{
|
{
|
||||||
if (IsDefault)
|
if (IsDefault)
|
||||||
|
|
|
@ -32,6 +32,10 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
||||||
bool IsDefault { get; }
|
bool IsDefault { get; }
|
||||||
int DbContextCount { get; }
|
int DbContextCount { get; }
|
||||||
DbContext CreateDbContext(IRouteTail routeTail);
|
DbContext CreateDbContext(IRouteTail routeTail);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 通知事务自动管理是否要清理还是开启还是加入事务
|
||||||
|
/// </summary>
|
||||||
void NotifyTransaction();
|
void NotifyTransaction();
|
||||||
|
|
||||||
int SaveChanges(bool acceptAllChangesOnSuccess);
|
int SaveChanges(bool acceptAllChangesOnSuccess);
|
||||||
|
|
|
@ -38,7 +38,7 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
|
||||||
|
|
||||||
public TResult Execute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext)
|
public TResult Execute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext)
|
||||||
{
|
{
|
||||||
//如果根表达式为iqueryable表示需要迭代
|
//如果根表达式为tolist toarray getenumerator等表示需要迭代
|
||||||
if (mergeQueryCompilerContext.IsEnumerableQuery())
|
if (mergeQueryCompilerContext.IsEnumerableQuery())
|
||||||
{
|
{
|
||||||
return EnumerableExecute<TResult>(mergeQueryCompilerContext);
|
return EnumerableExecute<TResult>(mergeQueryCompilerContext);
|
||||||
|
|
Loading…
Reference in New Issue