This commit is contained in:
parent
255398ba30
commit
ee0f6c1781
67
README.md
67
README.md
|
@ -291,9 +291,44 @@ AbstractSimpleShardingYearKeyLongVirtualTableRoute |按时间戳 |yyyy | `>,>=,<
|
|||
|
||||
## 批量操作
|
||||
|
||||
批量操作将对应的dbcontext和数据进行分离由用户自己选择第三方框架比如zzz进行批量操作或者batchextension
|
||||
批量操作将对应的dbcontext和数据进行分离由用户自己选择第三方框架比如[`Z.EntityFramework.Plus.EFCore`](https://github.com/zzzprojects/EntityFramework-Plus) 进行批量操作或者 [`EFCore.BulkExtensions`](https://github.com/borisdj/EFCore.BulkExtensions) ,支持一切三方批量框架
|
||||
```c#
|
||||
后期支持
|
||||
var list = new List<SysUserMod>();
|
||||
///通过集合返回出对应的k-v归集通过事务开启
|
||||
var dbContexts = _defaultTableDbContext.BulkShardingEnumerable(list);
|
||||
|
||||
using (var tran = _defaultTableDbContext.Database.BeginTransaction())
|
||||
{
|
||||
dbContexts.ForEach(kv =>
|
||||
{
|
||||
kv.Key.BulkInsert(kv.Value);
|
||||
});
|
||||
dbContexts.ForEach(kv =>
|
||||
{
|
||||
kv.Key.BulkDelete(kv.Value);
|
||||
});
|
||||
dbContexts.ForEach(kv =>
|
||||
{
|
||||
kv.Key.BulkUpdate(kv.Value);
|
||||
});
|
||||
_defaultTableDbContext.SaveChanges();
|
||||
tran.Commit();
|
||||
}
|
||||
|
||||
|
||||
var dbContext2s = _defaultTableDbContext.BulkShardingExpression<SysUserMod>(o => o.Age > 100);
|
||||
using (var tran = _defaultTableDbContext.Database.BeginTransaction())
|
||||
{
|
||||
dbContext2s.ForEach(dbContext =>
|
||||
{
|
||||
dbContext.Set<SysUserMod>().Where(o => o.Age > 100).Update(o => new SysUserMod()
|
||||
{
|
||||
AgeGroup = 1000
|
||||
});
|
||||
});
|
||||
_defaultTableDbContext.SaveChanges();
|
||||
tran.Commit();
|
||||
}
|
||||
```
|
||||
## 手动路由
|
||||
```c#
|
||||
|
@ -327,10 +362,20 @@ ctor inject IShardingRouteManager shardingRouteManager
|
|||
[参考](https://github.com/xuejmnet/sharding-core/tree/main/samples/Samples.AutoByDate.SqlServer)
|
||||
|
||||
## 事务
|
||||
默认savechanges支持事务
|
||||
1.默认savechanges支持事务
|
||||
```c#
|
||||
|
||||
await _defaultShardingDbContext.SaveChangesAsync();
|
||||
|
||||
|
||||
```
|
||||
2.手动开启事务 [请参考微软](https://docs.microsoft.com/zh-cn/ef/core/saving/transactions)
|
||||
```c#
|
||||
using (var tran = _defaultTableDbContext.Database.BeginTransaction())
|
||||
{
|
||||
........
|
||||
_defaultTableDbContext.SaveChanges();
|
||||
tran.Commit();
|
||||
}
|
||||
```
|
||||
## 读写分离
|
||||
该框架目前已经支持单node的读写分离,后续框架将支持多node的读
|
||||
|
@ -398,7 +443,19 @@ var shardingPageResultAsync = await _defaultTableDbContext.Set<SysUserMod>().Ord
|
|||
|
||||
|
||||
# 注意事项
|
||||
该库的追踪是基于adonet的MARS(MultipleActiveResultSets=True;)所以基本不支持该特性的无法支持完美追踪
|
||||
使用该框架需要注意两点如果你的shardingdbcontext重写了以下服务可能无法使用 如果还想使用需要自己重写扩展[请参考](https://github.com/xuejmnet/sharding-core/blob/main/src/ShardingCore/DIExtension.cs)
|
||||
1.shardingdbcontext
|
||||
```c#
|
||||
return optionsBuilder.ReplaceService<IDbSetSource, ShardingDbSetSource>()
|
||||
.ReplaceService<IQueryCompiler, ShardingQueryCompiler>()
|
||||
.ReplaceService<IRelationalTransactionFactory, ShardingRelationalTransactionFactory>();
|
||||
```
|
||||
2.defaultdbcontext
|
||||
```c#
|
||||
return optionsBuilder.ReplaceService<IModelCacheKeyFactory, ShardingModelCacheKeyFactory>()
|
||||
.ReplaceService<IModelCustomizer, ShardingModelCustomizer<TShardingDbContext>>();
|
||||
|
||||
```
|
||||
,目前框架采用AppDomain.CurrentDomain.GetAssemblies();
|
||||
可能会导致程序集未被加载所以尽可能在api层加载所需要的dll
|
||||
使用时需要注意
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
:start
|
||||
::定义版本
|
||||
set EFCORE2=2.2.0.17
|
||||
set EFCORE3=3.2.0.17
|
||||
set EFCORE5=5.2.0.17
|
||||
set EFCORE2=2.2.0.19
|
||||
set EFCORE3=3.2.0.19
|
||||
set EFCORE5=5.2.0.19
|
||||
|
||||
::删除所有bin与obj下的文件
|
||||
@echo off
|
||||
|
|
|
@ -5,12 +5,14 @@ using System.Linq;
|
|||
using System.Threading.Tasks;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Storage;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Sample.SqlServer.DbContexts;
|
||||
using Sample.SqlServer.Domain.Entities;
|
||||
using ShardingCore.Core.QueryRouteManagers.Abstractions;
|
||||
using ShardingCore.DbContexts.VirtualDbContexts;
|
||||
using ShardingCore.Extensions;
|
||||
using Z.EntityFramework.Plus;
|
||||
|
||||
namespace Sample.SqlServer.Controllers
|
||||
{
|
||||
|
@ -26,7 +28,7 @@ namespace Sample.SqlServer.Controllers
|
|||
private readonly DefaultShardingDbContext _defaultTableDbContext;
|
||||
private readonly IShardingRouteManager _shardingRouteManager;
|
||||
|
||||
public ValuesController(DefaultShardingDbContext defaultTableDbContext,IShardingRouteManager shardingRouteManager)
|
||||
public ValuesController(DefaultShardingDbContext defaultTableDbContext, IShardingRouteManager shardingRouteManager)
|
||||
{
|
||||
_defaultTableDbContext = defaultTableDbContext;
|
||||
_shardingRouteManager = shardingRouteManager;
|
||||
|
@ -36,17 +38,17 @@ namespace Sample.SqlServer.Controllers
|
|||
public async Task<IActionResult> Get()
|
||||
{
|
||||
var sql = from ut in _defaultTableDbContext.Set<SysTest>()
|
||||
join u in _defaultTableDbContext.Set<SysUserMod>()
|
||||
on ut.UserId equals u.Id
|
||||
select new
|
||||
{
|
||||
ut.Id,
|
||||
userId=u.Id
|
||||
};
|
||||
var listAsync =await sql.ToListAsync();
|
||||
join u in _defaultTableDbContext.Set<SysUserMod>()
|
||||
on ut.UserId equals u.Id
|
||||
select new
|
||||
{
|
||||
ut.Id,
|
||||
userId = u.Id
|
||||
};
|
||||
var listAsync = await sql.ToListAsync();
|
||||
var resultx112331tt = await _defaultTableDbContext.Set<SysTest>().CountAsync();
|
||||
var resultx112331 = await _defaultTableDbContext.Set<SysUserMod>().CountAsync();
|
||||
var resultx11233411 = _defaultTableDbContext.Set<SysUserMod>().Count();
|
||||
var resultx11233411 = _defaultTableDbContext.Set<SysUserMod>().Count();
|
||||
var resultx11231 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age == 198198).Select(o => o.Id).ContainsAsync("1981");
|
||||
var resultx1121 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").SumAsync(o => o.Age);
|
||||
var resultx111 = await _defaultTableDbContext.Set<SysUserMod>().FirstOrDefaultAsync(o => o.Id == "198");
|
||||
|
@ -88,11 +90,11 @@ namespace Sample.SqlServer.Controllers
|
|||
return Ok();
|
||||
}
|
||||
[HttpGet]
|
||||
public async Task<IActionResult> Get1([FromQuery] int p,[FromQuery]int s)
|
||||
public async Task<IActionResult> Get1([FromQuery] int p, [FromQuery] int s)
|
||||
{
|
||||
Stopwatch sp = new Stopwatch();
|
||||
sp.Start();
|
||||
var shardingPageResultAsync = await _defaultTableDbContext.Set<SysUserMod>().OrderBy(o=>o.Age).ToShardingPageAsync(p, s);
|
||||
var shardingPageResultAsync = await _defaultTableDbContext.Set<SysUserMod>().OrderBy(o => o.Age).ToShardingPageAsync(p, s);
|
||||
sp.Stop();
|
||||
return Ok(new
|
||||
{
|
||||
|
@ -101,11 +103,11 @@ namespace Sample.SqlServer.Controllers
|
|||
});
|
||||
}
|
||||
[HttpGet]
|
||||
public IActionResult Get2([FromQuery] int p,[FromQuery]int s)
|
||||
public IActionResult Get2([FromQuery] int p, [FromQuery] int s)
|
||||
{
|
||||
Stopwatch sp = new Stopwatch();
|
||||
sp.Start();
|
||||
var shardingPageResultAsync = _defaultTableDbContext.Set<SysUserMod>().OrderBy(o=>o.Age).ToShardingPage(p, s);
|
||||
var shardingPageResultAsync = _defaultTableDbContext.Set<SysUserMod>().OrderBy(o => o.Age).ToShardingPage(p, s);
|
||||
sp.Stop();
|
||||
return Ok(new
|
||||
{
|
||||
|
@ -113,5 +115,47 @@ namespace Sample.SqlServer.Controllers
|
|||
shardingPageResultAsync
|
||||
});
|
||||
}
|
||||
[HttpGet]
|
||||
public IActionResult Get3()
|
||||
{
|
||||
|
||||
var dbContext2s = _defaultTableDbContext.BulkShardingExpression<SysUserMod>(o => o.Age > 100);
|
||||
using (var tran = _defaultTableDbContext.Database.BeginTransaction())
|
||||
{
|
||||
dbContext2s.ForEach(dbContext =>
|
||||
{
|
||||
dbContext.Set<SysUserMod>().Where(o => o.Age > 100).Update(o => new SysUserMod()
|
||||
{
|
||||
AgeGroup = 1000
|
||||
});
|
||||
});
|
||||
_defaultTableDbContext.SaveChanges();
|
||||
tran.Commit();
|
||||
}
|
||||
var list = new List<SysUserMod>();
|
||||
var dbContexts = _defaultTableDbContext.BulkShardingEnumerable(list);
|
||||
|
||||
using (var tran = _defaultTableDbContext.Database.BeginTransaction())
|
||||
{
|
||||
dbContexts.ForEach(kv =>
|
||||
{
|
||||
kv.Key.BulkInsert(kv.Value);
|
||||
});
|
||||
dbContexts.ForEach(kv =>
|
||||
{
|
||||
kv.Key.BulkDelete(kv.Value);
|
||||
});
|
||||
dbContexts.ForEach(kv =>
|
||||
{
|
||||
kv.Key.BulkUpdate(kv.Value);
|
||||
});
|
||||
_defaultTableDbContext.SaveChanges();
|
||||
tran.Commit();
|
||||
}
|
||||
|
||||
|
||||
return Ok();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,8 +6,10 @@
|
|||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="EFCore.BulkExtensions" Version="5.3.8" />
|
||||
<PackageReference Include="EfCore.SqlServer2008Query" Version="1.0.4" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="5.0.9" />
|
||||
<PackageReference Include="Z.EntityFramework.Plus.EFCore" Version="5.2.9" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
|
|
@ -3,12 +3,14 @@ using Microsoft.AspNetCore.Hosting;
|
|||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Query;
|
||||
using Microsoft.EntityFrameworkCore.SqlServer.Query.Internal;
|
||||
using Microsoft.EntityFrameworkCore.Storage;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Sample.SqlServer.DbContexts;
|
||||
using Sample.SqlServer.Shardings;
|
||||
using ShardingCore;
|
||||
using ShardingCore.EFCores;
|
||||
|
||||
namespace Sample.SqlServer
|
||||
{
|
||||
|
@ -30,8 +32,8 @@ namespace Sample.SqlServer
|
|||
o => o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDB;Integrated Security=True;")
|
||||
, op =>
|
||||
{
|
||||
op.EnsureCreatedWithOutShardingTable = false;
|
||||
op.CreateShardingTableOnStart = false;
|
||||
op.EnsureCreatedWithOutShardingTable = true;
|
||||
op.CreateShardingTableOnStart = true;
|
||||
op.UseShardingOptionsBuilder(
|
||||
(connection, builder) => builder.UseSqlServer(connection).UseLoggerFactory(efLogger),//使用dbconnection创建dbcontext支持事务
|
||||
(conStr,builder) => builder.UseSqlServer(conStr).UseLoggerFactory(efLogger)
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Data.Common;
|
||||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Abp.EntityFrameworkCore;
|
||||
|
@ -21,7 +23,7 @@ using ShardingCore.Sharding.Abstractions;
|
|||
|
||||
namespace Samples.AbpSharding
|
||||
{
|
||||
public abstract class AbstractShardingAbpDbContext<T> : AbpDbContext, IShardingTableDbContext<T> where T : AbpDbContext, IShardingTableDbContext
|
||||
public abstract class AbstractShardingAbpDbContext<T> : AbpDbContext, IShardingDbContext<T> where T : AbpDbContext, IShardingTableDbContext
|
||||
{
|
||||
|
||||
|
||||
|
@ -127,6 +129,28 @@ namespace Samples.AbpSharding
|
|||
|
||||
return GetDbContext(true, _routeTailFactory.Create(tail));
|
||||
}
|
||||
|
||||
public IEnumerable<DbContext> CreateExpressionDbContext<TEntity>(Expression<Func<TEntity, bool>> where)
|
||||
where TEntity : class
|
||||
{
|
||||
if (typeof(TEntity).IsShardingTable())
|
||||
{
|
||||
var physicTable = _virtualTableManager.GetVirtualTable(ShardingDbContextType, typeof(TEntity)).RouteTo(new TableRouteConfig(predicate:where));
|
||||
if (physicTable.IsEmpty())
|
||||
throw new ShardingCoreException($"{where.ShardingPrint()} cant found any physic table");
|
||||
return physicTable.Select(o => GetDbContext(true, _routeTailFactory.Create(o.Tail)));
|
||||
}
|
||||
else
|
||||
{
|
||||
return new[] {GetDbContext(true, _routeTailFactory.Create(string.Empty))};
|
||||
}
|
||||
}
|
||||
|
||||
public void UseShardingTransaction(DbTransaction transaction)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public override EntityEntry Add(object entity)
|
||||
{
|
||||
return CreateGenericDbContext(entity).Add(entity);
|
||||
|
|
|
@ -12,6 +12,7 @@ using ShardingCore.Sharding;
|
|||
using ShardingCore.Sharding.Abstractions;
|
||||
using ShardingCore.TableCreator;
|
||||
using System;
|
||||
using Microsoft.EntityFrameworkCore.Storage;
|
||||
using ShardingCore.Core.QueryRouteManagers;
|
||||
using ShardingCore.Core.QueryRouteManagers.Abstractions;
|
||||
using ShardingCore.Core.ShardingPage;
|
||||
|
@ -38,7 +39,7 @@ namespace ShardingCore
|
|||
ServiceLifetime contextLifetime = ServiceLifetime.Scoped,
|
||||
ServiceLifetime optionsLifetime = ServiceLifetime.Scoped)
|
||||
where TActualDbContext : DbContext, IShardingTableDbContext
|
||||
where TShardingDbContext : DbContext, IShardingTableDbContext<TActualDbContext>
|
||||
where TShardingDbContext : DbContext, IShardingDbContext<TActualDbContext>
|
||||
{
|
||||
if (configure == null)
|
||||
throw new ArgumentNullException($"AddShardingDbContext params is null :{nameof(configure)}");
|
||||
|
@ -60,7 +61,13 @@ namespace ShardingCore
|
|||
Action<DbContextOptionsBuilder> shardingOptionAction = option =>
|
||||
{
|
||||
optionsAction?.Invoke(option);
|
||||
#if !EFCORE2
|
||||
option.UseSharding();
|
||||
|
||||
#endif
|
||||
#if EFCORE2
|
||||
option.UseSharding<TShardingDbContext>();
|
||||
#endif
|
||||
};
|
||||
services.AddDbContext<TShardingDbContext>(shardingOptionAction, contextLifetime, optionsLifetime);
|
||||
services.AddInternalShardingCore();
|
||||
|
@ -75,7 +82,7 @@ namespace ShardingCore
|
|||
ServiceLifetime contextLifetime = ServiceLifetime.Scoped,
|
||||
ServiceLifetime optionsLifetime = ServiceLifetime.Scoped)
|
||||
where TActualDbContext : DbContext, IShardingTableDbContext
|
||||
where TShardingDbContext : DbContext, IShardingTableDbContext<TActualDbContext>
|
||||
where TShardingDbContext : DbContext, IShardingDbContext<TActualDbContext>
|
||||
{
|
||||
if (configure == null)
|
||||
throw new ArgumentNullException($"AddShardingDbContext params is null :{nameof(configure)}");
|
||||
|
@ -97,7 +104,13 @@ namespace ShardingCore
|
|||
Action<IServiceProvider, DbContextOptionsBuilder> shardingOptionAction = (sp, option) =>
|
||||
{
|
||||
optionsAction?.Invoke(sp,option);
|
||||
#if !EFCORE2
|
||||
option.UseSharding();
|
||||
|
||||
#endif
|
||||
#if EFCORE2
|
||||
option.UseSharding<TShardingDbContext>();
|
||||
#endif
|
||||
};
|
||||
services.AddDbContext<TShardingDbContext>(shardingOptionAction, contextLifetime, optionsLifetime);
|
||||
services.AddInternalShardingCore();
|
||||
|
@ -132,12 +145,25 @@ namespace ShardingCore
|
|||
services.AddSingleton<IShardingBootstrapper, ShardingBootstrapper>();
|
||||
return services;
|
||||
}
|
||||
|
||||
#if !EFCORE2
|
||||
internal static DbContextOptionsBuilder UseSharding(this DbContextOptionsBuilder optionsBuilder)
|
||||
{
|
||||
return optionsBuilder.ReplaceService<IDbSetSource, ShardingDbSetSource>()
|
||||
.ReplaceService<IQueryCompiler, ShardingQueryCompiler>();
|
||||
.ReplaceService<IQueryCompiler, ShardingQueryCompiler>()
|
||||
.ReplaceService<IRelationalTransactionFactory, ShardingRelationalTransactionFactory>();
|
||||
}
|
||||
|
||||
#endif
|
||||
#if EFCORE2
|
||||
internal static DbContextOptionsBuilder UseSharding<TShardingDbContext>(this DbContextOptionsBuilder optionsBuilder) where TShardingDbContext : DbContext, IShardingDbContext
|
||||
{
|
||||
return optionsBuilder.ReplaceService<IDbSetSource, ShardingDbSetSource>()
|
||||
.ReplaceService<IQueryCompiler, ShardingQueryCompiler>()
|
||||
.ReplaceService<IRelationalTransactionFactory, ShardingRelationalTransactionFactory<TShardingDbContext>>();
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
internal static DbContextOptionsBuilder UseInnerDbContextSharding<TShardingDbContext>(this DbContextOptionsBuilder optionsBuilder) where TShardingDbContext:DbContext,IShardingDbContext
|
||||
{
|
||||
return optionsBuilder.ReplaceService<IModelCacheKeyFactory, ShardingModelCacheKeyFactory>()
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
#if EFCORE2
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
|
||||
namespace ShardingCore.DbContexts
|
||||
{
|
||||
/*
|
||||
* @Author: xjm
|
||||
* @Description:
|
||||
* @Date: 2021/9/5 21:29:47
|
||||
* @Ver: 1.0
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
class EFCore2DbContextLocation
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -0,0 +1,225 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Data;
|
||||
using System.Data.Common;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Internal;
|
||||
using Microsoft.EntityFrameworkCore.Query.Internal;
|
||||
using Microsoft.EntityFrameworkCore.Storage;
|
||||
using Microsoft.EntityFrameworkCore.Storage.Internal;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using ShardingCore.Extensions;
|
||||
using ShardingCore.Sharding.Abstractions;
|
||||
|
||||
namespace ShardingCore.EFCores
|
||||
{
|
||||
/*
|
||||
* @Author: xjm
|
||||
* @Description:
|
||||
* @Date: 2021/9/5 15:41:20
|
||||
* @Ver: 1.0
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
public class ShardingRelationalConnection : IRelationalConnection
|
||||
{
|
||||
private readonly IRelationalConnection _relationalConnection;
|
||||
|
||||
|
||||
#if !EFCORE2
|
||||
public ShardingRelationalConnection(IRelationalConnection _relationalConnection, DbTransaction transaction)
|
||||
{
|
||||
this._relationalConnection = _relationalConnection;
|
||||
((IShardingTransaction)Context).UseShardingTransaction(transaction);
|
||||
}
|
||||
|
||||
#endif
|
||||
#if EFCORE2
|
||||
private readonly Type _dbContextType;
|
||||
public ShardingRelationalConnection(IRelationalConnection _relationalConnection,DbTransaction transaction,Type dbContextType)
|
||||
{
|
||||
this._relationalConnection = _relationalConnection;
|
||||
_dbContextType = dbContextType;
|
||||
((IShardingTransaction)Context).UseShardingTransaction(transaction);
|
||||
}
|
||||
#endif
|
||||
|
||||
public void ResetState()
|
||||
{
|
||||
_relationalConnection.ResetState();
|
||||
}
|
||||
|
||||
#if !EFCORE2
|
||||
public Task ResetStateAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return _relationalConnection.ResetStateAsync(cancellationToken);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
public IDbContextTransaction BeginTransaction()
|
||||
{
|
||||
return _relationalConnection.BeginTransaction();
|
||||
}
|
||||
|
||||
public Task<IDbContextTransaction> BeginTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return _relationalConnection.BeginTransactionAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public void CommitTransaction()
|
||||
{
|
||||
_relationalConnection.CommitTransaction();
|
||||
}
|
||||
|
||||
|
||||
public void RollbackTransaction()
|
||||
{
|
||||
_relationalConnection.RollbackTransaction();
|
||||
}
|
||||
#if EFCORE5
|
||||
public IDbContextTransaction UseTransaction(DbTransaction transaction, Guid transactionId)
|
||||
{
|
||||
var dbContextTransaction = _relationalConnection.UseTransaction(transaction, transactionId);
|
||||
((IShardingTransaction)Context).UseShardingTransaction(transaction);
|
||||
return dbContextTransaction;
|
||||
}
|
||||
public async Task<IDbContextTransaction> UseTransactionAsync(DbTransaction transaction, Guid transactionId,
|
||||
CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, transactionId, cancellationToken);
|
||||
((IShardingTransaction)Context).UseShardingTransaction(transaction);
|
||||
return dbContextTransaction;
|
||||
}
|
||||
|
||||
public Task CommitTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return _relationalConnection.CommitTransactionAsync(cancellationToken);
|
||||
}
|
||||
public Task RollbackTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return _relationalConnection.RollbackTransactionAsync(cancellationToken);
|
||||
}
|
||||
#endif
|
||||
|
||||
#if !EFCORE5
|
||||
public bool IsMultipleActiveResultSetsEnabled => _relationalConnection.IsMultipleActiveResultSetsEnabled;
|
||||
|
||||
|
||||
# endif
|
||||
IDbContextTransaction IRelationalConnection.CurrentTransaction => _relationalConnection.CurrentTransaction;
|
||||
|
||||
IDbContextTransaction IDbContextTransactionManager.CurrentTransaction => _relationalConnection.CurrentTransaction;
|
||||
|
||||
|
||||
public SemaphoreSlim Semaphore => _relationalConnection.Semaphore;
|
||||
|
||||
public bool Open(bool errorsExpected = false)
|
||||
{
|
||||
return _relationalConnection.Open(errorsExpected);
|
||||
}
|
||||
|
||||
public Task<bool> OpenAsync(CancellationToken cancellationToken, bool errorsExpected = false)
|
||||
{
|
||||
return _relationalConnection.OpenAsync(cancellationToken, errorsExpected);
|
||||
}
|
||||
|
||||
public bool Close()
|
||||
{
|
||||
return _relationalConnection.Close();
|
||||
}
|
||||
|
||||
|
||||
|
||||
public DbConnection DbConnection => _relationalConnection.DbConnection;
|
||||
|
||||
public DbContext Context =>
|
||||
#if !EFCORE2
|
||||
_relationalConnection.Context;
|
||||
#endif
|
||||
#if EFCORE2
|
||||
GetDbContext();
|
||||
|
||||
private DbContext GetDbContext()
|
||||
{
|
||||
var namedConnectionStringResolver = ((RelationalConnectionDependencies)_relationalConnection.GetPropertyValue("Dependencies")).ConnectionStringResolver;
|
||||
var serviceProvider = (IServiceProvider)namedConnectionStringResolver.GetPropertyValue("ApplicationServiceProvider");
|
||||
var dbContext = (DbContext)serviceProvider.GetService(_dbContextType);
|
||||
return dbContext;
|
||||
}
|
||||
|
||||
|
||||
public void RegisterBufferable(IBufferable bufferable)
|
||||
{
|
||||
_relationalConnection.RegisterBufferable(bufferable);
|
||||
}
|
||||
|
||||
public Task RegisterBufferableAsync(IBufferable bufferable, CancellationToken cancellationToken)
|
||||
{
|
||||
return _relationalConnection.RegisterBufferableAsync(bufferable, cancellationToken);
|
||||
}
|
||||
#endif
|
||||
public Guid ConnectionId => _relationalConnection.ConnectionId;
|
||||
|
||||
public int? CommandTimeout
|
||||
{
|
||||
get
|
||||
{
|
||||
return _relationalConnection.CommandTimeout;
|
||||
}
|
||||
set
|
||||
{
|
||||
_relationalConnection.CommandTimeout = value;
|
||||
}
|
||||
}
|
||||
|
||||
public IDbContextTransaction BeginTransaction(IsolationLevel isolationLevel)
|
||||
{
|
||||
return _relationalConnection.BeginTransaction(isolationLevel);
|
||||
}
|
||||
|
||||
public Task<IDbContextTransaction> BeginTransactionAsync(IsolationLevel isolationLevel,
|
||||
CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return _relationalConnection.BeginTransactionAsync(isolationLevel, cancellationToken);
|
||||
}
|
||||
|
||||
public IDbContextTransaction UseTransaction(DbTransaction transaction)
|
||||
{
|
||||
var dbContextTransaction = _relationalConnection.UseTransaction(transaction);
|
||||
((IShardingTransaction)Context).UseShardingTransaction(transaction);
|
||||
return dbContextTransaction;
|
||||
}
|
||||
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_relationalConnection.Dispose();
|
||||
}
|
||||
|
||||
|
||||
public string ConnectionString => _relationalConnection.ConnectionString;
|
||||
#if !EFCORE2
|
||||
|
||||
public async Task<IDbContextTransaction> UseTransactionAsync(DbTransaction transaction, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
var dbContextTransaction = await _relationalConnection.UseTransactionAsync(transaction, cancellationToken);
|
||||
((IShardingTransaction)Context).UseShardingTransaction(transaction);
|
||||
return dbContextTransaction;
|
||||
|
||||
}
|
||||
|
||||
public Task<bool> CloseAsync()
|
||||
{
|
||||
return _relationalConnection.CloseAsync();
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
return _relationalConnection.DisposeAsync();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Data.Common;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Diagnostics;
|
||||
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||
using Microsoft.EntityFrameworkCore.Storage;
|
||||
using ShardingCore.Sharding.Abstractions;
|
||||
|
||||
namespace ShardingCore.EFCores
|
||||
{
|
||||
/*
|
||||
* @Author: xjm
|
||||
* @Description:
|
||||
* @Date: 2021/9/5 20:37:36
|
||||
* @Ver: 1.0
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
//public class ShardingRelationalTransaction: RelationalTransaction
|
||||
//{
|
||||
// private readonly IShardingDbContext _shardingDbContext;
|
||||
// public ShardingRelationalTransaction(IRelationalConnection connection, DbTransaction transaction, Guid transactionId, IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger, bool transactionOwned) : base(connection, transaction, transactionId, logger, transactionOwned)
|
||||
// {
|
||||
// _shardingDbContext = (IShardingDbContext)null;
|
||||
// _shardingDbContext.UseShardingTransaction(transaction);
|
||||
// }
|
||||
|
||||
// protected override void ClearTransaction()
|
||||
// {
|
||||
// base.ClearTransaction();
|
||||
// _shardingDbContext.UseShardingTransaction(null);
|
||||
// }
|
||||
|
||||
// protected override async Task ClearTransactionAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||
// {
|
||||
// await base.ClearTransactionAsync(cancellationToken);
|
||||
// _shardingDbContext.UseShardingTransaction(null);
|
||||
|
||||
// }
|
||||
//}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Data.Common;
|
||||
using System.Text;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Diagnostics;
|
||||
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||
using Microsoft.EntityFrameworkCore.Storage;
|
||||
using ShardingCore.Sharding.Abstractions;
|
||||
|
||||
namespace ShardingCore.EFCores
|
||||
{
|
||||
/*
|
||||
* @Author: xjm
|
||||
* @Description:
|
||||
* @Date: 2021/9/5 16:03:04
|
||||
* @Ver: 1.0
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
#if !EFCORE2
|
||||
public class ShardingRelationalTransactionFactory: RelationalTransactionFactory
|
||||
{
|
||||
private readonly RelationalTransactionFactoryDependencies _dependencies;
|
||||
public ShardingRelationalTransactionFactory(RelationalTransactionFactoryDependencies dependencies) : base(dependencies)
|
||||
{
|
||||
_dependencies = dependencies;
|
||||
}
|
||||
public override RelationalTransaction Create(IRelationalConnection connection, DbTransaction transaction, Guid transactionId,
|
||||
IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger, bool transactionOwned)
|
||||
{
|
||||
return new RelationalTransaction(new ShardingRelationalConnection(connection, transaction), transaction, transactionId, logger, transactionOwned);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
#if EFCORE2
|
||||
|
||||
public class ShardingRelationalTransactionFactory<TShardingDbContext> : RelationalTransactionFactory where TShardingDbContext:DbContext,IShardingDbContext
|
||||
{
|
||||
private readonly RelationalTransactionFactoryDependencies _dependencies;
|
||||
public ShardingRelationalTransactionFactory(RelationalTransactionFactoryDependencies dependencies) : base(dependencies)
|
||||
{
|
||||
_dependencies = dependencies;
|
||||
}
|
||||
public override RelationalTransaction Create(IRelationalConnection connection, DbTransaction transaction
|
||||
, IDiagnosticsLogger<DbLoggerCategory.Database.Transaction> logger,
|
||||
bool transactionOwned)
|
||||
{
|
||||
return new RelationalTransaction(new ShardingRelationalConnection(connection, transaction,typeof(TShardingDbContext)), transaction, logger,
|
||||
transactionOwned);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
|
@ -48,7 +48,7 @@ namespace ShardingCore.Extensions
|
|||
}
|
||||
|
||||
/// <summary>
|
||||
/// IShardingTableDbContext
|
||||
/// IShardingDbContext
|
||||
/// </summary>
|
||||
/// <param name="dbContext"></param>
|
||||
/// <returns></returns>
|
||||
|
@ -59,7 +59,7 @@ namespace ShardingCore.Extensions
|
|||
return dbContext is IShardingTableDbContext;
|
||||
}
|
||||
/// <summary>
|
||||
/// IShardingTableDbContext
|
||||
/// IShardingDbContext
|
||||
/// </summary>
|
||||
/// <param name="dbContextType"></param>
|
||||
/// <returns></returns>
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using ShardingCore.Sharding.PaginationConfigurations;
|
||||
using ShardingCore.Sharding.StreamMergeEngines;
|
||||
|
||||
namespace ShardingCore.Extensions.InternalExtensions
|
||||
{
|
||||
|
@ -19,7 +21,19 @@ namespace ShardingCore.Extensions.InternalExtensions
|
|||
if (total < paginationMetadata.ReverseTotalGe)
|
||||
return false;
|
||||
|
||||
return paginationMetadata.ReverseFactor * total < skip;
|
||||
return skip> paginationMetadata.ReverseFactor * total;
|
||||
}
|
||||
internal static bool IsUseUneven(this PaginationMetadata paginationMetadata,ICollection<RouteQueryResult<long>> routeQueryResults,int skip)
|
||||
{
|
||||
if (routeQueryResults.Count <= 1)
|
||||
return false;
|
||||
|
||||
if (skip < paginationMetadata.UnevenLimit)
|
||||
return false;
|
||||
var total = routeQueryResults.Sum(o => o.QueryResult);
|
||||
if(total* paginationMetadata.UnevenFactorGe < routeQueryResults.First().QueryResult)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using System.Text;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||
using ShardingCore.Exceptions;
|
||||
using ShardingCore.Sharding.Abstractions;
|
||||
|
@ -26,7 +28,7 @@ namespace ShardingCore.Extensions
|
|||
// /// </summary>
|
||||
// /// <param name="dbContext"></param>
|
||||
// /// <returns></returns>
|
||||
// public static string GetShardingTableDbContextTail(this IShardingTableDbContext dbContext)
|
||||
// public static string GetShardingTableDbContextTail(this IShardingDbContext dbContext)
|
||||
// {
|
||||
// return dbContext.RouteTail?.Replace(ShardingTableDbContextFormat, string.Empty)??string.Empty;
|
||||
//
|
||||
|
@ -36,7 +38,7 @@ namespace ShardingCore.Extensions
|
|||
// /// </summary>
|
||||
// /// <param name="dbContext"></param>
|
||||
// /// <param name="tail"></param>
|
||||
// public static void SetShardingTableDbContextTail(this IShardingTableDbContext dbContext, string tail)
|
||||
// public static void SetShardingTableDbContextTail(this IShardingDbContext dbContext, string tail)
|
||||
// {
|
||||
// if (!string.IsNullOrWhiteSpace(dbContext.ModelChangeKey))
|
||||
// throw new ShardingCoreException($"repeat set ModelChangeKey in {dbContext.GetType().FullName}");
|
||||
|
@ -58,5 +60,37 @@ namespace ShardingCore.Extensions
|
|||
return expression.ToString();
|
||||
#endif
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 根据对象集合解析
|
||||
/// </summary>
|
||||
/// <typeparam name="TEntity"></typeparam>
|
||||
/// <param name="shardingDbContext"></param>
|
||||
/// <param name="entities"></param>
|
||||
/// <returns></returns>
|
||||
public static IDictionary<DbContext, IEnumerable<TEntity>> BulkShardingEnumerable<TEntity>(this IShardingDbContext shardingDbContext,
|
||||
IEnumerable<TEntity> entities) where TEntity : class
|
||||
{
|
||||
return entities.Select(o =>
|
||||
{
|
||||
var dbContext = shardingDbContext.CreateGenericDbContext(o);
|
||||
return new
|
||||
{
|
||||
DbContext = dbContext,
|
||||
Entity = o
|
||||
};
|
||||
}).GroupBy(g => g.DbContext).ToDictionary(o=>o.Key,o=>o.Select(g=>g.Entity));
|
||||
}
|
||||
/// <summary>
|
||||
/// 根据条件表达式解析
|
||||
/// </summary>
|
||||
/// <typeparam name="TEntity"></typeparam>
|
||||
/// <param name="shardingDbContext"></param>
|
||||
/// <param name="where"></param>
|
||||
/// <returns></returns>
|
||||
public static IEnumerable<DbContext> BulkShardingExpression<TEntity>(this IShardingDbContext shardingDbContext, Expression<Func<TEntity, bool>> where) where TEntity : class
|
||||
{
|
||||
return shardingDbContext.CreateExpressionDbContext(where);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,9 +14,13 @@ using System;
|
|||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Data;
|
||||
using System.Data.Common;
|
||||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||
using ShardingCore.EFCores;
|
||||
|
||||
namespace ShardingCore.Sharding
|
||||
{
|
||||
|
@ -30,7 +34,7 @@ namespace ShardingCore.Sharding
|
|||
/// 分表分库的dbcontext
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
public abstract class AbstractShardingDbContext<T> : DbContext, IShardingTableDbContext<T> where T : DbContext, IShardingTableDbContext
|
||||
public abstract class AbstractShardingDbContext<T> : DbContext, IShardingDbContext<T>, IShardingTransaction where T : DbContext, IShardingTableDbContext
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, DbContext> _dbContextCaches = new ConcurrentDictionary<string, DbContext>();
|
||||
private readonly IVirtualTableManager _virtualTableManager;
|
||||
|
@ -40,6 +44,7 @@ namespace ShardingCore.Sharding
|
|||
private DbContextOptions<T> _dbContextOptions;
|
||||
|
||||
private readonly object CREATELOCK = new object();
|
||||
private Guid idid = Guid.NewGuid();
|
||||
|
||||
public AbstractShardingDbContext(DbContextOptions options) : base(options)
|
||||
{
|
||||
|
@ -53,13 +58,22 @@ namespace ShardingCore.Sharding
|
|||
|
||||
public abstract Type ShardingDbContextType { get; }
|
||||
public Type ActualDbContextType => typeof(T);
|
||||
//private ShardingDatabaseFacade _database;
|
||||
//public override DatabaseFacade Database
|
||||
//{
|
||||
// get
|
||||
// {
|
||||
|
||||
// return _database ?? (_database = new ShardingDatabaseFacade(this));
|
||||
// }
|
||||
//}
|
||||
|
||||
|
||||
private DbContextOptionsBuilder<T> CreateDbContextOptionBuilder()
|
||||
{
|
||||
Type type = typeof(DbContextOptionsBuilder<>);
|
||||
type = type.MakeGenericType(ActualDbContextType);
|
||||
return (DbContextOptionsBuilder<T>) Activator.CreateInstance(type);
|
||||
return (DbContextOptionsBuilder<T>)Activator.CreateInstance(type);
|
||||
}
|
||||
|
||||
private DbContextOptions<T> CreateShareDbContextOptions()
|
||||
|
@ -73,7 +87,7 @@ namespace ShardingCore.Sharding
|
|||
{
|
||||
var dbContextOptionBuilder = CreateDbContextOptionBuilder();
|
||||
var connectionString = Database.GetDbConnection().ConnectionString;
|
||||
_shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(connectionString,dbContextOptionBuilder);
|
||||
_shardingDbContextOptionsBuilderConfig.UseDbContextOptionsBuilder(connectionString, dbContextOptionBuilder);
|
||||
return dbContextOptionBuilder.Options;
|
||||
}
|
||||
|
||||
|
@ -104,7 +118,7 @@ namespace ShardingCore.Sharding
|
|||
{
|
||||
if (routeTail.IsMultiEntityQuery())
|
||||
throw new ShardingCoreException("multi route not support track");
|
||||
if(!(routeTail is ISingleQueryRouteTail singleQueryRouteTail))
|
||||
if (!(routeTail is ISingleQueryRouteTail singleQueryRouteTail))
|
||||
throw new ShardingCoreException("multi route not support track");
|
||||
var cacheKey = routeTail.GetRouteTailIdenty();
|
||||
if (!_dbContextCaches.TryGetValue(cacheKey, out var dbContext))
|
||||
|
@ -113,6 +127,8 @@ namespace ShardingCore.Sharding
|
|||
_dbContextCaches.TryAdd(cacheKey, dbContext);
|
||||
}
|
||||
|
||||
if (IsBeginTransaction)
|
||||
dbContext.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction());
|
||||
return dbContext;
|
||||
}
|
||||
else
|
||||
|
@ -134,7 +150,27 @@ namespace ShardingCore.Sharding
|
|||
|
||||
return GetDbContext(true, _routeTailFactory.Create(tail));
|
||||
}
|
||||
|
||||
|
||||
public IEnumerable<DbContext> CreateExpressionDbContext<TEntity>(Expression<Func<TEntity, bool>> @where) where TEntity : class
|
||||
{
|
||||
if (typeof(TEntity).IsShardingTable())
|
||||
{
|
||||
var physicTable = _virtualTableManager.GetVirtualTable(ShardingDbContextType, typeof(TEntity)).RouteTo(new TableRouteConfig(predicate:@where));
|
||||
if (physicTable.IsEmpty())
|
||||
throw new ShardingCoreException($"{@where.ShardingPrint()} cant found ant physic table");
|
||||
return physicTable.Select(o => GetDbContext(true, _routeTailFactory.Create(o.Tail)));
|
||||
}
|
||||
else
|
||||
{
|
||||
return new[] { GetDbContext(true, _routeTailFactory.Create(string.Empty)) };
|
||||
}
|
||||
}
|
||||
|
||||
public void UseShardingTransaction(DbTransaction transaction)
|
||||
{
|
||||
_dbContextCaches.Values.ForEach(o => o.Database.UseTransaction(transaction));
|
||||
}
|
||||
|
||||
|
||||
public override EntityEntry Add(object entity)
|
||||
{
|
||||
|
@ -399,34 +435,26 @@ namespace ShardingCore.Sharding
|
|||
{
|
||||
var isBeginTransaction = IsBeginTransaction;
|
||||
//如果是内部开的事务就内部自己消化
|
||||
if (!isBeginTransaction)
|
||||
{
|
||||
Database.BeginTransaction();
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
if (!isBeginTransaction)
|
||||
{
|
||||
using(var tran= Database.BeginTransaction())
|
||||
{
|
||||
|
||||
try
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
i += dbContextCache.Value.SaveChanges();
|
||||
}
|
||||
tran.Commit();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
dbContextCache.Value.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction());
|
||||
i += dbContextCache.Value.SaveChanges();
|
||||
}
|
||||
|
||||
if (!isBeginTransaction)
|
||||
Database.CurrentTransaction.Commit();
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!isBeginTransaction)
|
||||
{
|
||||
Database.CurrentTransaction?.Dispose();
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
dbContextCache.Value.Database.UseTransaction(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return i;
|
||||
|
@ -435,35 +463,26 @@ namespace ShardingCore.Sharding
|
|||
public override int SaveChanges(bool acceptAllChangesOnSuccess)
|
||||
{
|
||||
var isBeginTransaction = IsBeginTransaction;
|
||||
int i = 0;
|
||||
//如果是内部开的事务就内部自己消化
|
||||
if (!isBeginTransaction)
|
||||
{
|
||||
Database.BeginTransaction();
|
||||
using (var tran = Database.BeginTransaction())
|
||||
{
|
||||
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
i += dbContextCache.Value.SaveChanges(acceptAllChangesOnSuccess);
|
||||
}
|
||||
tran.Commit();
|
||||
}
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
|
||||
try
|
||||
else
|
||||
{
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
dbContextCache.Value.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction());
|
||||
i += dbContextCache.Value.SaveChanges(acceptAllChangesOnSuccess);
|
||||
}
|
||||
|
||||
if (!isBeginTransaction)
|
||||
Database.CurrentTransaction.Commit();
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!isBeginTransaction)
|
||||
{
|
||||
Database.CurrentTransaction?.Dispose();
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
dbContextCache.Value.Database.UseTransaction(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return i;
|
||||
|
@ -474,39 +493,27 @@ namespace ShardingCore.Sharding
|
|||
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
var isBeginTransaction = IsBeginTransaction;
|
||||
|
||||
int i = 0;
|
||||
//如果是内部开的事务就内部自己消化
|
||||
if (!isBeginTransaction)
|
||||
{
|
||||
await Database.BeginTransactionAsync(cancellationToken);
|
||||
using (var tran = await Database.BeginTransactionAsync(cancellationToken))
|
||||
{
|
||||
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
i += await dbContextCache.Value.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
await tran.CommitAsync();
|
||||
}
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
|
||||
try
|
||||
else
|
||||
{
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
await dbContextCache.Value.Database.UseTransactionAsync(Database.CurrentTransaction.GetDbTransaction(), cancellationToken: cancellationToken);
|
||||
i += await dbContextCache.Value.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
|
||||
if (!isBeginTransaction)
|
||||
await Database.CurrentTransaction.CommitAsync(cancellationToken);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!isBeginTransaction)
|
||||
{
|
||||
}
|
||||
|
||||
if (Database.CurrentTransaction != null)
|
||||
{
|
||||
await Database.CurrentTransaction.DisposeAsync();
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
await dbContextCache.Value.Database.UseTransactionAsync(null, cancellationToken: cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return i;
|
||||
|
@ -517,37 +524,27 @@ namespace ShardingCore.Sharding
|
|||
{
|
||||
|
||||
var isBeginTransaction = IsBeginTransaction;
|
||||
int i = 0;
|
||||
//如果是内部开的事务就内部自己消化
|
||||
if (!isBeginTransaction)
|
||||
{
|
||||
await Database.BeginTransactionAsync(cancellationToken);
|
||||
}
|
||||
int i = 0;
|
||||
|
||||
try
|
||||
{
|
||||
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
using (var tran = await Database.BeginTransactionAsync(cancellationToken))
|
||||
{
|
||||
dbContextCache.Value.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction());
|
||||
i += await dbContextCache.Value.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
if (!isBeginTransaction)
|
||||
Database.CurrentTransaction.Commit();
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!isBeginTransaction) { }
|
||||
if (Database.CurrentTransaction != null)
|
||||
{
|
||||
Database.CurrentTransaction.Dispose();
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
dbContextCache.Value.Database.UseTransaction(null);
|
||||
i += await dbContextCache.Value.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
tran.Commit();
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
i += await dbContextCache.Value.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
return i;
|
||||
}
|
||||
#endif
|
||||
|
@ -556,38 +553,31 @@ namespace ShardingCore.Sharding
|
|||
public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
var isBeginTransaction = IsBeginTransaction;
|
||||
|
||||
int i = 0;
|
||||
//如果是内部开的事务就内部自己消化
|
||||
if (!isBeginTransaction)
|
||||
{
|
||||
await Database.BeginTransactionAsync(cancellationToken);
|
||||
using (var tran = await Database.BeginTransactionAsync(cancellationToken))
|
||||
{
|
||||
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
|
||||
}
|
||||
|
||||
await tran.CommitAsync();
|
||||
}
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
|
||||
try
|
||||
else
|
||||
{
|
||||
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
await dbContextCache.Value.Database.UseTransactionAsync(Database.CurrentTransaction.GetDbTransaction(), cancellationToken: cancellationToken);
|
||||
i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
|
||||
}
|
||||
|
||||
if (!isBeginTransaction)
|
||||
await Database.CurrentTransaction.CommitAsync(cancellationToken);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!isBeginTransaction)
|
||||
if (Database.CurrentTransaction != null)
|
||||
{
|
||||
await Database.CurrentTransaction.DisposeAsync();
|
||||
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
await dbContextCache.Value.Database.UseTransactionAsync(null, cancellationToken: cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return i;
|
||||
}
|
||||
|
@ -597,37 +587,27 @@ namespace ShardingCore.Sharding
|
|||
{
|
||||
|
||||
var isBeginTransaction = IsBeginTransaction;
|
||||
int i = 0;
|
||||
//如果是内部开的事务就内部自己消化
|
||||
if (!isBeginTransaction)
|
||||
{
|
||||
await Database.BeginTransactionAsync(cancellationToken);
|
||||
}
|
||||
int i = 0;
|
||||
|
||||
try
|
||||
{
|
||||
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
using (var tran = await Database.BeginTransactionAsync(cancellationToken))
|
||||
{
|
||||
dbContextCache.Value.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction());
|
||||
i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
|
||||
}
|
||||
if (!isBeginTransaction)
|
||||
Database.CurrentTransaction.Commit();
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!isBeginTransaction)
|
||||
if (Database.CurrentTransaction != null)
|
||||
{
|
||||
Database.CurrentTransaction.Dispose();
|
||||
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
dbContextCache.Value.Database.UseTransaction(null);
|
||||
}
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
|
||||
}
|
||||
|
||||
tran.Commit();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
foreach (var dbContextCache in _dbContextCaches)
|
||||
{
|
||||
i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
|
||||
}
|
||||
}
|
||||
return i;
|
||||
}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
using Microsoft.EntityFrameworkCore;
|
||||
using ShardingCore.Core.VirtualRoutes.RouteTails.Abstractions;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Data.Common;
|
||||
using System.Linq.Expressions;
|
||||
|
||||
namespace ShardingCore.Sharding.Abstractions
|
||||
{
|
||||
|
@ -12,30 +15,44 @@ namespace ShardingCore.Sharding.Abstractions
|
|||
*/
|
||||
public interface IShardingDbContext
|
||||
{
|
||||
/// <summary>
|
||||
/// 当前sharding的db context type
|
||||
/// </summary>
|
||||
Type ShardingDbContextType { get; }
|
||||
/// <summary>
|
||||
/// <20><>ʵ<EFBFBD><CAB5>DbContext <20><><EFBFBD><EFBFBD>
|
||||
/// 真实的db context type
|
||||
/// </summary>
|
||||
Type ActualDbContextType { get;}
|
||||
/// <summary>
|
||||
/// create DbContext
|
||||
/// </summary>
|
||||
/// <param name="track">true not care dbcontext life, false need call dispose()</param>
|
||||
/// <param name="track">true not care db context life, false need call dispose()</param>
|
||||
/// <param name="routeTail"></param>
|
||||
/// <returns></returns>
|
||||
DbContext GetDbContext(bool track,IRouteTail routeTail);
|
||||
/// <summary>
|
||||
/// <EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʵ<EFBFBD>崴<EFBFBD><EFBFBD>db context
|
||||
/// 创建通用的db context
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="entity"></param>
|
||||
/// <returns></returns>
|
||||
DbContext CreateGenericDbContext<T>(T entity) where T : class;
|
||||
|
||||
/// <summary>
|
||||
/// 根据表达式创建db context
|
||||
/// </summary>
|
||||
/// <typeparam name="TEntity"></typeparam>
|
||||
/// <param name="where"></param>
|
||||
/// <returns></returns>
|
||||
|
||||
IEnumerable<DbContext> CreateExpressionDbContext<TEntity>(Expression<Func<TEntity, bool>> where)
|
||||
where TEntity : class;
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
public interface IShardingTableDbContext<T> : IShardingDbContext where T : DbContext, IShardingTableDbContext
|
||||
public interface IShardingDbContext<T> : IShardingDbContext where T : DbContext, IShardingTableDbContext
|
||||
{
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Data.Common;
|
||||
using System.Text;
|
||||
|
||||
namespace ShardingCore.Sharding.Abstractions
|
||||
{
|
||||
/*
|
||||
* @Author: xjm
|
||||
* @Description:
|
||||
* @Date: 2021/9/6 8:41:50
|
||||
* @Ver: 1.0
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
public interface IShardingTransaction
|
||||
{
|
||||
void UseShardingTransaction(DbTransaction transaction);
|
||||
}
|
||||
}
|
|
@ -20,27 +20,34 @@ namespace ShardingCore.Sharding.PaginationConfigurations
|
|||
public ISet<PaginationSequenceConfig> PaginationConfigs = new HashSet<PaginationSequenceConfig>();
|
||||
|
||||
/// <summary>
|
||||
/// 反向排序因子
|
||||
/// 反向排序因子 skip>ReverseFactor * total
|
||||
/// </summary>
|
||||
public double ReverseFactor { get; set; } = -1;
|
||||
|
||||
/// <summary>
|
||||
/// 当条数大于多少条后采用反向排序
|
||||
/// 当条数大于ReverseTotalGe条后采用反向排序
|
||||
/// </summary>
|
||||
public long ReverseTotalGe { get; set; } = 10000L;
|
||||
/// <summary>
|
||||
/// 是否已开启反向排序 仅支持单排序
|
||||
/// 是否已开启反向排序 skip>ReverseFactor * total 查询条件必须存在 order by
|
||||
/// </summary>
|
||||
public bool EnableReverseShardingPage => ReverseFactor > 0 && ReverseFactor < 1 && ReverseTotalGe >= 500;
|
||||
// /// <summary>
|
||||
// /// 当出现N张表分页需要跳过X条数据,获取Y条数据除了total条数最多的那张表以外的其他表和小于TakeInMemoryMaxRangeSkip那么就启用
|
||||
// /// </summary>
|
||||
// public int TakeInMemoryMaxRangeSkip { get; set; } = 1000;
|
||||
//
|
||||
// public bool EnableTakeInMemory(int skip)
|
||||
// {
|
||||
// return skip > TakeInMemoryMaxRangeSkip && TakeInMemoryMaxRangeSkip > 500;
|
||||
// }
|
||||
|
||||
/// <summary>
|
||||
/// 极度不规则分布时当分页最大一页书占全部的(UnevenFactorGe*100)%时启用内存排序
|
||||
/// </summary>
|
||||
[Obsolete]
|
||||
public double UnevenFactorGe { get; set; } = -1;
|
||||
/// <summary>
|
||||
/// 极度不规则分布时除了total最大一张表外的其余表相加不能超过UnevenLimit
|
||||
/// </summary>
|
||||
[Obsolete]
|
||||
public int UnevenLimit { get; set; } = 300;
|
||||
/// <summary>
|
||||
/// 启用不规则分布分页 查询条件必须存在 order by
|
||||
/// </summary>
|
||||
[Obsolete]
|
||||
public bool EnableUnevenShardingPage => UnevenFactorGe > 0 && UnevenFactorGe < 1 && UnevenLimit > 0;
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,8 +90,15 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
|
|||
var total = _shardingPageManager.Current.RouteQueryResults.Sum(o => o.QueryResult);
|
||||
if (paginationMetadata.IsUseReverse(_streamMergeContext.Skip.GetValueOrDefault(), total))
|
||||
{
|
||||
return new ReverseShardingEnumeratorAsyncStreamMergeEngine<TEntity>(
|
||||
_streamMergeContext, _streamMergeContext.Orders, total);
|
||||
return new ReverseShardingEnumeratorAsyncStreamMergeEngine<TEntity>( _streamMergeContext, total);
|
||||
}
|
||||
}
|
||||
|
||||
if (paginationMetadata.EnableUnevenShardingPage)
|
||||
{
|
||||
if (paginationMetadata.IsUseUneven(_shardingPageManager.Current.RouteQueryResults, _streamMergeContext.Skip.GetValueOrDefault()))
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,12 +21,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
|
|||
*/
|
||||
public class ReverseShardingEnumeratorAsyncStreamMergeEngine<TEntity> : AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
|
||||
{
|
||||
private readonly IEnumerable<PropertyOrder> _primaryOrders;
|
||||
private readonly long _total;
|
||||
|
||||
public ReverseShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, IEnumerable<PropertyOrder> primaryOrders, long total) : base(streamMergeContext)
|
||||
public ReverseShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, long total) : base(streamMergeContext)
|
||||
{
|
||||
_primaryOrders = primaryOrders;
|
||||
_total = total;
|
||||
}
|
||||
|
||||
|
@ -40,7 +38,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
|
|||
var realSkip = _total- take- skip;
|
||||
var tableResult = StreamMergeContext.RouteResults;
|
||||
StreamMergeContext.ReSetSkip((int)realSkip);
|
||||
var propertyOrders =_primaryOrders.Select(o=>new PropertyOrder( o.PropertyExpression,!o.IsAsc)).ToArray();
|
||||
var propertyOrders = StreamMergeContext.Orders.Select(o=>new PropertyOrder( o.PropertyExpression,!o.IsAsc)).ToArray();
|
||||
StreamMergeContext.ReSetOrders(propertyOrders);
|
||||
var reverseOrderQueryable = noPaginationNoOrderQueryable.Take((int)realSkip+(int)take).OrderWithExpression(propertyOrders);
|
||||
var enumeratorTasks = tableResult.Select(routeResult =>
|
||||
|
|
|
@ -21,7 +21,7 @@ namespace ShardingCore
|
|||
*/
|
||||
public class ShardingConfigOption<TShardingDbContext, TActualDbContext> : IShardingConfigOption
|
||||
where TActualDbContext : DbContext, IShardingTableDbContext
|
||||
where TShardingDbContext : DbContext, IShardingTableDbContext<TActualDbContext>
|
||||
where TShardingDbContext : DbContext, IShardingDbContext<TActualDbContext>
|
||||
{
|
||||
private readonly Dictionary<Type, Type> _virtualRoutes = new Dictionary<Type, Type>();
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="5.0.7" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="5.0.7" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="5.0.9" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="5.0.9" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
|
|
|
@ -3,10 +3,12 @@ using System.Collections.Generic;
|
|||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Storage;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ShardingCore.EFCores;
|
||||
using ShardingCore.Test50_2x.Domain.Entities;
|
||||
using ShardingCore.Test50_2x.Shardings;
|
||||
|
||||
|
@ -41,8 +43,9 @@ namespace ShardingCore.Test50_2x
|
|||
// ConfigureServices(HostBuilderContext hostBuilderContext, IServiceCollection services)
|
||||
public void ConfigureServices(IServiceCollection services, HostBuilderContext hostBuilderContext)
|
||||
{
|
||||
services.AddDbContext<DefaultDbContext>();
|
||||
services.AddShardingDbContext<ShardingDefaultDbContext, DefaultDbContext>(o => o.UseSqlServer(hostBuilderContext.Configuration.GetSection("SqlServer")["ConnectionString"])
|
||||
,op =>
|
||||
, op =>
|
||||
{
|
||||
op.EnsureCreatedWithOutShardingTable = true;
|
||||
op.CreateShardingTableOnStart = true;
|
||||
|
|
Loading…
Reference in New Issue