优化track的判断,如果不启用那么默认不进行track,如果非跨表查询或者非跨库查询那么将在启用track后保证和原生一样,如果跨表或者跨库后那么将使用sharding的追踪

This commit is contained in:
xuejiaming 2021-09-30 22:07:50 +08:00
parent dd3457e135
commit e9226e2773
19 changed files with 351 additions and 104 deletions

View File

@ -578,16 +578,12 @@ var list = new List<SysUserMod>();
``` ```
## 自动追踪 ## 自动追踪
默认shardingcore不支持单次查询跨表自动追踪,并且也不建议使用自动追踪,如果你有需要shardingcore也默认提供了自动追踪功能 默认shardingcore不支持自动追踪,并且也不建议使用自动追踪,如果你有需要shardingcore也默认提供了自动追踪功能
有两点需要注意 有两点需要注意
目前仅支持单主键对象
1.如果本次查询不涉及跨表那么支持(跨库也可以) 1.shardingcore仅支持dbcontext的model的类型的整个查询匿名类型不支持联级查询不支持
2.shardingcore的单个查询依然走数据库不走缓存如果查询出来的结果缓存里面有就返回缓存里面的而不是数据库的
2.如果设计跨表那么仅支持dbcontext的model的类型的整个查询匿名类型不支持联级查询不支持 3.tolist等操作会查询数据库返回的时候判断是否已经追踪如果已经追踪则返回缓存里已经追踪了的值
3.不跨表的情况下和efcore的自动追踪一样
3.不跨表的情况下tolist等操作会查询数据库返回的时候判断是否已经追踪如果已经追踪则返回缓存里已经追踪了的值
4.支持 `first`,`firstordefault`,`last`,`lastordefault`,`single`,`singleordefault` 4.支持 `first`,`firstordefault`,`last`,`lastordefault`,`single`,`singleordefault`
如何开启 如何开启
```c# ```c#

View File

@ -13,9 +13,12 @@ namespace ShardingCore.Core.TrackerManagers
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public interface ITrackerManager<TShardingDbContext> where TShardingDbContext:DbContext,IShardingDbContext public interface ITrackerManager
{ {
bool AddDbContextModel(Type entityType); bool AddDbContextModel(Type entityType);
bool EntityUseTrack(Type entityType); bool EntityUseTrack(Type entityType);
} }
public interface ITrackerManager<TShardingDbContext>: ITrackerManager where TShardingDbContext:DbContext,IShardingDbContext
{
}
} }

View File

@ -24,6 +24,7 @@ using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources; using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualTables; using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding.ShardingDbContextExecutors;
using ShardingCore.Sharding.ShardingTransactions; using ShardingCore.Sharding.ShardingTransactions;
namespace ShardingCore.Sharding namespace ShardingCore.Sharding
@ -85,7 +86,19 @@ namespace ShardingCore.Sharding
{ {
return _shardingDbContextExecutor.CreateGenericDbContext(entity); return _shardingDbContextExecutor.CreateGenericDbContext(entity);
} }
/// <summary>
/// 是否启用了读写分离
/// </summary>
/// <returns></returns>
public bool IsUseReadWriteSeparation()
{
return _shardingDbContextExecutor.IsUseReadWriteSeparation();
}
public bool EnableAutoTrack()
{
return _shardingDbContextExecutor.EnableAutoTrack();
}
public override EntityEntry Add(object entity) public override EntityEntry Add(object entity)

View File

@ -1,10 +1,6 @@
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq.Expressions;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using System;
namespace ShardingCore.Sharding.Abstractions namespace ShardingCore.Sharding.Abstractions
{ {
@ -35,8 +31,13 @@ namespace ShardingCore.Sharding.Abstractions
/// <param name="entity"></param> /// <param name="entity"></param>
/// <returns></returns> /// <returns></returns>
DbContext CreateGenericDbContext<T>(T entity) where T : class; DbContext CreateGenericDbContext<T>(T entity) where T : class;
/// <summary>
/// 是否启用了读写分离
/// </summary>
/// <returns></returns>
bool IsUseReadWriteSeparation();
bool EnableAutoTrack();
} }

View File

@ -34,9 +34,17 @@ namespace ShardingCore.Sharding.Abstractions
/// </summary> /// </summary>
int ReadWriteSeparationPriority { get; set; } int ReadWriteSeparationPriority { get; set; }
/// <summary> /// <summary>
/// 是否开启读写分离 /// 当前是否开启读写分离
/// </summary> /// </summary>
bool ReadWriteSeparation { get; set; } bool ReadWriteSeparation { get; set; }
/// <summary>
/// 是否使用了读写分离
/// </summary>
/// <returns></returns>
bool IsUseReadWriteSeparation();
bool EnableAutoTrack();
/// <summary> /// <summary>
/// create sharding db context options /// create sharding db context options
/// </summary> /// </summary>

View File

@ -35,6 +35,10 @@ namespace ShardingCore.Sharding
_useReadWriteSeparation = _connectionStringManager is ReadWriteConnectionStringManager<TShardingDbContext>; _useReadWriteSeparation = _connectionStringManager is ReadWriteConnectionStringManager<TShardingDbContext>;
} }
public bool IsUseReadWriteSeparation()
{
return _useReadWriteSeparation;
}
public string GetConnectionString(string dataSourceName, bool isWrite) public string GetConnectionString(string dataSourceName, bool isWrite)
{ {
if (isWrite) if (isWrite)

View File

@ -0,0 +1,179 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.DbContexts;
using ShardingCore.DbContexts.ShardingDbContexts;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingTransactions;
namespace ShardingCore.Sharding.ShardingDbContextExecutors
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/30 10:53:23
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class DataSourceDbContext<TShardingDbContext> : IDisposable
#if !EFCORE2
, IAsyncDisposable
#endif
where TShardingDbContext : DbContext, IShardingDbContext
{
/// <summary>
/// 数据源名称
/// </summary>
public string DataSourceName { get; }
private readonly IShardingDbContextFactory<TShardingDbContext> _shardingDbContextFactory;
private ConcurrentDictionary<string, DbContext> _dataSourceDbContexts =
new ConcurrentDictionary<string, DbContext>();
private IDbContextTransaction _dbContextTransaction;
private IsolationLevel isolationLevel = IsolationLevel.Unspecified;
private bool _isBeginTransaction;
/// <summary>
///
/// </summary>
/// <param name="dataSourceName"></param>
/// <param name="shardingDbContextFactory"></param>
/// <param name="isBeginTransaction"></param>
public DataSourceDbContext(string dataSourceName, IShardingDbContextFactory<TShardingDbContext> shardingDbContextFactory, bool isBeginTransaction)
{
DataSourceName = dataSourceName;
_shardingDbContextFactory = shardingDbContextFactory;
_isBeginTransaction = isBeginTransaction;
}
public void BeginTransaction(IsolationLevel isolationLevel = IsolationLevel.Unspecified)
{
if (_isBeginTransaction)
throw new InvalidOperationException("transaction is already begin");
_isBeginTransaction = true;
this.isolationLevel = isolationLevel;
}
public bool IsEmpty()
{
return !_dataSourceDbContexts.Any();
}
public DbContext TryGetOrCreateDbContext(IRouteTail routeTail, ShardingDbContextOptions shardingDbContextOptions)
{
var cacheKey = routeTail.GetRouteTailIdentity();
if (!_dataSourceDbContexts.TryGetValue(cacheKey, out var dbContext))
{
dbContext = _shardingDbContextFactory.Create(shardingDbContextOptions);
if (_isBeginTransaction)
{
if (_dbContextTransaction == null)
{
_dbContextTransaction = dbContext.Database.BeginTransaction(isolationLevel);
}
UseTransaction(_dbContextTransaction);
}
_dataSourceDbContexts.TryAdd(cacheKey, dbContext);
}
return dbContext;
}
public DbConnection GetDbConnection()
{
return _dataSourceDbContexts.First().Value.Database.GetDbConnection();
}
public void UseTransaction(IDbContextTransaction dbContextTransaction)
{
if (dbContextTransaction == null)
{
foreach (var dataSourceDbContext in _dataSourceDbContexts)
{
if (dataSourceDbContext.Value.Database.CurrentTransaction != null)
dataSourceDbContext.Value.Database.UseTransaction(null);
}
}
else
{
foreach (var dataSourceDbContext in _dataSourceDbContexts)
{
if (dataSourceDbContext.Value.Database.CurrentTransaction == null)
dataSourceDbContext.Value.Database.UseTransaction(dbContextTransaction.GetDbTransaction());
}
}
}
public async Task UseTransactionAsync(IDbContextTransaction dbContextTransaction, CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
if (dbContextTransaction == null)
{
foreach (var dataSourceDbContext in _dataSourceDbContexts)
{
if (dataSourceDbContext.Value.Database.CurrentTransaction != null)
await dataSourceDbContext.Value.Database.UseTransactionAsync(null, cancellationToken);
}
}
else
{
foreach (var dataSourceDbContext in _dataSourceDbContexts)
{
if (dataSourceDbContext.Value.Database.CurrentTransaction == null)
await dataSourceDbContext.Value.Database.UseTransactionAsync(dbContextTransaction.GetDbTransaction(), cancellationToken);
}
}
}
/// <summary>
/// 提交
/// </summary>
/// <param name="acceptAllChangesOnSuccess"></param>
/// <returns></returns>
public int SaveChanges(bool acceptAllChangesOnSuccess)
{
int i = 0;
foreach (var dataSourceDbContext in _dataSourceDbContexts)
{
i += dataSourceDbContext.Value.SaveChanges(acceptAllChangesOnSuccess);
}
return i;
}
public async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
{
int i = 0;
foreach (var dataSourceDbContext in _dataSourceDbContexts)
{
i += await dataSourceDbContext.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
}
return i;
}
public void Dispose()
{
foreach (var dataSourceDbContext in _dataSourceDbContexts)
{
dataSourceDbContext.Value.Dispose();
}
}
public async ValueTask DisposeAsync()
{
foreach (var dataSourceDbContext in _dataSourceDbContexts)
{
await dataSourceDbContext.Value.DisposeAsync();
}
}
}
}

View File

@ -1,7 +1,13 @@
using Microsoft.EntityFrameworkCore; using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources; using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualTables; using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.DbContexts; using ShardingCore.DbContexts;
using ShardingCore.DbContexts.ShardingDbContexts; using ShardingCore.DbContexts.ShardingDbContexts;
@ -9,16 +15,8 @@ using ShardingCore.Exceptions;
using ShardingCore.Extensions; using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingTransactions; using ShardingCore.Sharding.ShardingTransactions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding namespace ShardingCore.Sharding.ShardingDbContextExecutors
{ {
/* /*
* @Author: xjm * @Author: xjm
@ -40,6 +38,8 @@ namespace ShardingCore.Sharding
private readonly IShardingDbContextFactory<TShardingDbContext> _shardingDbContextFactory; private readonly IShardingDbContextFactory<TShardingDbContext> _shardingDbContextFactory;
private readonly IShardingDbContextOptionsBuilderConfig _shardingDbContextOptionsBuilderConfig; private readonly IShardingDbContextOptionsBuilderConfig _shardingDbContextOptionsBuilderConfig;
private readonly IRouteTailFactory _routeTailFactory; private readonly IRouteTailFactory _routeTailFactory;
private readonly ActualConnectionStringManager<TShardingDbContext> _actualConnectionStringManager;
private readonly IShardingConfigOption _shardingConfigOption;
public int ReadWriteSeparationPriority public int ReadWriteSeparationPriority
{ {
@ -55,7 +55,6 @@ namespace ShardingCore.Sharding
public bool IsBeginTransaction => CurrentShardingTransaction != null && CurrentShardingTransaction.IsBeginTransaction(); public bool IsBeginTransaction => CurrentShardingTransaction != null && CurrentShardingTransaction.IsBeginTransaction();
private readonly ActualConnectionStringManager<TShardingDbContext> _actualConnectionStringManager;
public ShardingDbContextExecutor() public ShardingDbContextExecutor()
{ {
@ -66,6 +65,8 @@ namespace ShardingCore.Sharding
_routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>(); _routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
_actualConnectionStringManager = new ActualConnectionStringManager<TShardingDbContext>(); _actualConnectionStringManager = new ActualConnectionStringManager<TShardingDbContext>();
_shardingConfigOption = ShardingContainer.GetServices<IShardingConfigOption>().FirstOrDefault(o => o.ShardingDbContextType == typeof(TShardingDbContext));
} }
#region create db context #region create db context
@ -118,6 +119,17 @@ namespace ShardingCore.Sharding
{ {
return new ShardingDbContextOptions(CreateParallelDbContextOptions(dataSourceName), routeTail); return new ShardingDbContextOptions(CreateParallelDbContextOptions(dataSourceName), routeTail);
} }
public bool IsUseReadWriteSeparation()
{
return _actualConnectionStringManager.IsUseReadWriteSeparation();
}
public bool EnableAutoTrack()
{
return _shardingConfigOption.AutoTrackEntity;
}
public DbContext CreateDbContext(bool parallelQuery, string dataSourceName, IRouteTail routeTail) public DbContext CreateDbContext(bool parallelQuery, string dataSourceName, IRouteTail routeTail)
{ {
@ -130,6 +142,7 @@ namespace ShardingCore.Sharding
if (!_dbContextCaches.TryGetValue(dataSourceName, out var tailDbContexts)) if (!_dbContextCaches.TryGetValue(dataSourceName, out var tailDbContexts))
{ {
tailDbContexts = new ConcurrentDictionary<string, DbContext>(); tailDbContexts = new ConcurrentDictionary<string, DbContext>();
_dbContextCaches.TryAdd(dataSourceName, tailDbContexts);
} }
var cacheKey = routeTail.GetRouteTailIdentity(); var cacheKey = routeTail.GetRouteTailIdentity();
if (!tailDbContexts.TryGetValue(cacheKey, out var dbContext)) if (!tailDbContexts.TryGetValue(cacheKey, out var dbContext))

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Concurrent;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.StreamMerge.ReWrite; using ShardingCore.Core.Internal.StreamMerge.ReWrite;
using ShardingCore.Core.Internal.Visitors; using ShardingCore.Core.Internal.Visitors;
@ -8,6 +9,8 @@ using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Abstractions;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Core.VirtualDatabase.VirtualTables; using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine; using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
@ -22,7 +25,10 @@ namespace ShardingCore.Sharding
* @Date: Monday, 25 January 2021 11:38:27 * @Date: Monday, 25 January 2021 11:38:27
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class StreamMergeContext<T> public class StreamMergeContext<T>:IDisposable
#if !EFCORE2
,IAsyncDisposable
#endif
{ {
//private readonly IShardingScopeFactory _shardingScopeFactory; //private readonly IShardingScopeFactory _shardingScopeFactory;
private readonly IQueryable<T> _source; private readonly IQueryable<T> _source;
@ -35,8 +41,8 @@ namespace ShardingCore.Sharding
public int? Skip { get; private set; } public int? Skip { get; private set; }
public int? Take { get; } public int? Take { get; }
public IEnumerable<PropertyOrder> Orders { get; private set; } public IEnumerable<PropertyOrder> Orders { get; private set; }
public SelectContext SelectContext { get;} public SelectContext SelectContext { get; }
public GroupByContext GroupByContext { get; } public GroupByContext GroupByContext { get; }
public IEnumerable<TableRouteResult> TableRouteResults { get; } public IEnumerable<TableRouteResult> TableRouteResults { get; }
public DataSourceRouteResult DataSourceRouteResult { get; } public DataSourceRouteResult DataSourceRouteResult { get; }
@ -57,7 +63,11 @@ namespace ShardingCore.Sharding
/// </summary> /// </summary>
public bool IsCrossTable { get; } public bool IsCrossTable { get; }
public StreamMergeContext(IQueryable<T> source,IShardingDbContext shardingDbContext, private readonly ITrackerManager _trackerManager;
private readonly ConcurrentDictionary<DbContext, object> _parallelDbContexts;
public StreamMergeContext(IQueryable<T> source, IShardingDbContext shardingDbContext,
DataSourceRouteResult dataSourceRouteResult, DataSourceRouteResult dataSourceRouteResult,
IEnumerable<TableRouteResult> tableRouteResults, IEnumerable<TableRouteResult> tableRouteResults,
IRouteTailFactory routeTailFactory) IRouteTailFactory routeTailFactory)
@ -76,9 +86,13 @@ namespace ShardingCore.Sharding
_reWriteSource = reWriteResult.ReWriteQueryable; _reWriteSource = reWriteResult.ReWriteQueryable;
QueryEntities = source.ParseQueryableRoute(); QueryEntities = source.ParseQueryableRoute();
DataSourceRouteResult = dataSourceRouteResult; DataSourceRouteResult = dataSourceRouteResult;
TableRouteResults= tableRouteResults; TableRouteResults = tableRouteResults;
IsCrossDataSource = dataSourceRouteResult.IntersectDataSources.Count > 1; IsCrossDataSource = dataSourceRouteResult.IntersectDataSources.Count > 1;
IsCrossTable=tableRouteResults.Count() > 1; IsCrossTable = tableRouteResults.Count() > 1;
_trackerManager =
(ITrackerManager)ShardingContainer.GetService(
typeof(ITrackerManager<>).GetGenericType0(shardingDbContext.GetType()));
_parallelDbContexts = new ConcurrentDictionary<DbContext, object>();
//RouteResults = _tableTableRouteRuleEngineFactory.Route(_shardingDbContext.ShardingDbContextType, _source); //RouteResults = _tableTableRouteRuleEngineFactory.Route(_shardingDbContext.ShardingDbContextType, _source);
} }
//public StreamMergeContext(IQueryable<T> source,IEnumerable<TableRouteResult> routeResults, //public StreamMergeContext(IQueryable<T> source,IEnumerable<TableRouteResult> routeResults,
@ -114,7 +128,14 @@ namespace ShardingCore.Sharding
public DbContext CreateDbContext(string dataSourceName, TableRouteResult tableRouteResult) public DbContext CreateDbContext(string dataSourceName, TableRouteResult tableRouteResult)
{ {
var routeTail = _routeTailFactory.Create(tableRouteResult); var routeTail = _routeTailFactory.Create(tableRouteResult);
return _shardingDbContext.GetDbContext(dataSourceName, IsCrossTable, routeTail); //如果开启了读写分离或者本次查询是跨表或者跨库的表示本次查询的dbcontext是不存储的用完后就直接dispose
var parallelQuery = IsParallelQuery();
var dbContext = _shardingDbContext.GetDbContext(dataSourceName, parallelQuery, routeTail);
if (parallelQuery)
{
_parallelDbContexts.TryAdd(dbContext, null);
}
return dbContext;
} }
public IRouteTail Create(TableRouteResult tableRouteResult) public IRouteTail Create(TableRouteResult tableRouteResult)
@ -140,7 +161,7 @@ namespace ShardingCore.Sharding
{ {
return Skip.GetValueOrDefault() > 0 || Take.GetValueOrDefault() > 0; return Skip.GetValueOrDefault() > 0 || Take.GetValueOrDefault() > 0;
} }
public bool HasGroupQuery() public bool HasGroupQuery()
{ {
@ -156,6 +177,70 @@ namespace ShardingCore.Sharding
{ {
return _shardingDbContext; return _shardingDbContext;
} }
/// <summary>
/// 是否是跨资源查询
/// </summary>
/// <returns></returns>
private bool IsCrossQuery()
{
return IsCrossDataSource || IsCrossTable;
}
private bool IsUseReadWriteSeparation()
{
return _shardingDbContext.IsUseReadWriteSeparation();
}
/// <summary>
/// 是否使用并行查询
/// </summary>
/// <returns></returns>
private bool IsParallelQuery()
{
return !_shardingDbContext.EnableAutoTrack()|| IsCrossQuery() || IsUseReadWriteSeparation();
}
/// <summary>
/// 是否使用sharding track
/// </summary>
/// <returns></returns>
public bool IsUseShardingTrack(Type entityType)
{
//没有跨dbcontext查询并且不是读写分离才可以那么是否追踪之类的由查询的dbcontext自行处理
if (!IsParallelQuery())
return false;
return QueryTrack() && _trackerManager.EntityUseTrack(entityType);
}
private bool QueryTrack()
{
if (IsNoTracking.HasValue)
{
return !IsNoTracking.Value;
}
else
{
return ((DbContext)_shardingDbContext).ChangeTracker.QueryTrackingBehavior ==
QueryTrackingBehavior.TrackAll;
}
}
public void Dispose()
{
foreach (var dbContext in _parallelDbContexts.Keys)
{
dbContext.Dispose();
}
}
#if !EFCORE2
public async ValueTask DisposeAsync()
{
foreach (var dbContext in _parallelDbContexts.Keys)
{
await dbContext.DisposeAsync();
}
}
#endif
} }
} }

View File

@ -25,7 +25,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
private readonly StreamMergeContext<TEntity> _mergeContext; private readonly StreamMergeContext<TEntity> _mergeContext;
private readonly IQueryable<TEntity> _queryable; private readonly IQueryable<TEntity> _queryable;
private readonly Expression _secondExpression; private readonly Expression _secondExpression;
private readonly ICollection<DbContext> _parllelDbbContexts;
public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext)
{ {
@ -58,7 +57,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
_mergeContext = ((IStreamMergeContextFactory)ShardingContainer.GetService(typeof(IStreamMergeContextFactory<>).GetGenericType0(shardingDbContext.GetType()))).Create(_queryable, shardingDbContext); _mergeContext = ((IStreamMergeContextFactory)ShardingContainer.GetService(typeof(IStreamMergeContextFactory<>).GetGenericType0(shardingDbContext.GetType()))).Create(_queryable, shardingDbContext);
_parllelDbbContexts = new LinkedList<DbContext>();
} }
/// <summary> /// <summary>
/// 合并queryable /// 合并queryable
@ -71,7 +69,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
private IQueryable CreateAsyncExecuteQueryable<TResult>(string dsname,TableRouteResult tableRouteResult) private IQueryable CreateAsyncExecuteQueryable<TResult>(string dsname,TableRouteResult tableRouteResult)
{ {
var shardingDbContext = _mergeContext.CreateDbContext(dsname,tableRouteResult); var shardingDbContext = _mergeContext.CreateDbContext(dsname,tableRouteResult);
_parllelDbbContexts.Add(shardingDbContext);
var newQueryable = (IQueryable<TEntity>) GetStreamMergeContext().GetReWriteQueryable() var newQueryable = (IQueryable<TEntity>) GetStreamMergeContext().GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext); .ReplaceDbContextQueryable(shardingDbContext);
var newCombineQueryable= DoCombineQueryable<TResult>(newQueryable); var newCombineQueryable= DoCombineQueryable<TResult>(newQueryable);

View File

@ -21,36 +21,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
*/ */
public abstract class AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext,TEntity> : AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity> where TShardingDbContext:DbContext,IShardingDbContext public abstract class AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext,TEntity> : AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity> where TShardingDbContext:DbContext,IShardingDbContext
{ {
private readonly ITrackerManager<TShardingDbContext> _trackerManager;
protected AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) protected AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{ {
_trackerManager = ShardingContainer.GetService<ITrackerManager<TShardingDbContext>>();
}
/// <summary>
/// 手动追踪
/// </summary>
private bool IsUseManualTrack => GetIsUseManualTrack();
private bool GetIsUseManualTrack()
{
if (!GetStreamMergeContext().IsCrossTable)
return false;
if (GetStreamMergeContext().IsNoTracking.HasValue)
{
return !GetStreamMergeContext().IsNoTracking.Value;
}
else
{
return ((DbContext)GetStreamMergeContext().GetShardingDbContext()).ChangeTracker.QueryTrackingBehavior ==
QueryTrackingBehavior.TrackAll;
}
} }
public override TResult MergeResult<TResult>() public override TResult MergeResult<TResult>()
{ {
var current = DoMergeResult<TResult>(); var current = DoMergeResult<TResult>();
if (current != null) if (current != null)
{ {
if (IsUseManualTrack && _trackerManager.EntityUseTrack(current.GetType())) if (GetStreamMergeContext().IsUseShardingTrack(current.GetType()))
{ {
var c = (object)current; var c = (object)current;
var genericDbContext = GetStreamMergeContext().GetShardingDbContext().CreateGenericDbContext(c); var genericDbContext = GetStreamMergeContext().GetShardingDbContext().CreateGenericDbContext(c);
@ -73,7 +52,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
var current = await DoMergeResultAsync<TResult>(cancellationToken); var current = await DoMergeResultAsync<TResult>(cancellationToken);
if (current != null) if (current != null)
{ {
if (IsUseManualTrack && _trackerManager.EntityUseTrack(current.GetType())) if (GetStreamMergeContext().IsUseShardingTrack(current.GetType()))
{ {
var c = (object)current; var c = (object)current;
var genericDbContext = GetStreamMergeContext().GetShardingDbContext().CreateGenericDbContext(c); var genericDbContext = GetStreamMergeContext().GetShardingDbContext().CreateGenericDbContext(c);

View File

@ -19,30 +19,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines
where TShardingDbContext:DbContext,IShardingDbContext where TShardingDbContext:DbContext,IShardingDbContext
{ {
private readonly StreamMergeContext<T> _mergeContext; private readonly StreamMergeContext<T> _mergeContext;
private readonly ITrackerManager<TShardingDbContext> _trackerManager;
public AsyncEnumerableStreamMergeEngine(StreamMergeContext<T> mergeContext) public AsyncEnumerableStreamMergeEngine(StreamMergeContext<T> mergeContext)
{ {
_mergeContext = mergeContext; _mergeContext = mergeContext;
_trackerManager = ShardingContainer.GetService<ITrackerManager<TShardingDbContext>>();
} }
private bool IsUseManualTrack => GetIsUseManualTrack();
private bool GetIsUseManualTrack()
{
if (!_mergeContext.IsCrossTable)
return false;
if (_mergeContext.IsNoTracking.HasValue)
{
return !_mergeContext.IsNoTracking.Value;
}
else
{
return ((DbContext) _mergeContext.GetShardingDbContext()).ChangeTracker.QueryTrackingBehavior ==
QueryTrackingBehavior.TrackAll;
}
}
#if !EFCORE2 #if !EFCORE2
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
@ -50,7 +32,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
var asyncEnumerator = new EnumeratorShardingQueryExecutor<TShardingDbContext,T>(_mergeContext).ExecuteAsync(cancellationToken) var asyncEnumerator = new EnumeratorShardingQueryExecutor<TShardingDbContext,T>(_mergeContext).ExecuteAsync(cancellationToken)
.GetAsyncEnumerator(cancellationToken); .GetAsyncEnumerator(cancellationToken);
if (IsUseManualTrack&&_trackerManager.EntityUseTrack(typeof(T))) if (_mergeContext.IsUseShardingTrack(typeof(T)))
{ {
return new AsyncTrackerEnumerator<T>(_mergeContext, asyncEnumerator); return new AsyncTrackerEnumerator<T>(_mergeContext, asyncEnumerator);
} }
@ -64,7 +46,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{ {
var asyncEnumerator = ((IAsyncEnumerable<T>)new EnumeratorShardingQueryExecutor<TShardingDbContext,T>(_mergeContext).ExecuteAsync()) var asyncEnumerator = ((IAsyncEnumerable<T>)new EnumeratorShardingQueryExecutor<TShardingDbContext,T>(_mergeContext).ExecuteAsync())
.GetEnumerator(); .GetEnumerator();
if (IsUseManualTrack&&_trackerManager.EntityUseTrack(typeof(T))) if (_mergeContext.IsUseShardingTrack(typeof(T)))
{ {
return new AsyncTrackerEnumerator<T>(_mergeContext, asyncEnumerator); return new AsyncTrackerEnumerator<T>(_mergeContext, asyncEnumerator);
} }
@ -78,7 +60,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
var enumerator = ((IEnumerable<T>)new EnumeratorShardingQueryExecutor<TShardingDbContext,T>(_mergeContext).ExecuteAsync()) var enumerator = ((IEnumerable<T>)new EnumeratorShardingQueryExecutor<TShardingDbContext,T>(_mergeContext).ExecuteAsync())
.GetEnumerator(); .GetEnumerator();
if (IsUseManualTrack&&_trackerManager.EntityUseTrack(typeof(T))) if (_mergeContext.IsUseShardingTrack(typeof(T)))
{ {
return new TrackerEnumerator<T>(_mergeContext, enumerator); return new TrackerEnumerator<T>(_mergeContext, enumerator);
} }

View File

@ -22,13 +22,11 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
public abstract class AbstractEnumeratorStreamMergeEngine<TEntity> : IEnumeratorStreamMergeEngine<TEntity> public abstract class AbstractEnumeratorStreamMergeEngine<TEntity> : IEnumeratorStreamMergeEngine<TEntity>
{ {
public StreamMergeContext<TEntity> StreamMergeContext { get; } public StreamMergeContext<TEntity> StreamMergeContext { get; }
public ConcurrentDictionary<TableRouteResult, DbContext> DbContextQueryStore { get; }
public AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) public AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
{ {
StreamMergeContext = streamMergeContext; StreamMergeContext = streamMergeContext;
DbContextQueryStore = new ConcurrentDictionary<TableRouteResult, DbContext>();
} }
public abstract IStreamMergeAsyncEnumerator<TEntity> GetShardingAsyncEnumerator(bool async, public abstract IStreamMergeAsyncEnumerator<TEntity> GetShardingAsyncEnumerator(bool async,
@ -62,13 +60,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
public void Dispose() public void Dispose()
{ {
if (DbContextQueryStore.IsNotEmpty()) StreamMergeContext.Dispose();
{
DbContextQueryStore.Values.ForEach(dbContext =>
{
dbContext.Dispose();
});
}
} }
} }

View File

@ -114,8 +114,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname, IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult, IEnumerable<PropertyOrder> reSetOrders) private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname, IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult, IEnumerable<PropertyOrder> reSetOrders)
{ {
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname, sequenceResult.TableRouteResult); var shardingDbContext = StreamMergeContext.CreateDbContext(dsname, sequenceResult.TableRouteResult);
if (StreamMergeContext.IsCrossTable)
DbContextQueryStore.TryAdd(sequenceResult.TableRouteResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take).OrderWithExpression(reSetOrders)) var newQueryable = (IQueryable<TEntity>)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take).OrderWithExpression(reSetOrders))
.ReplaceDbContextQueryable(shardingDbContext); .ReplaceDbContextQueryable(shardingDbContext);
return newQueryable; return newQueryable;

View File

@ -46,8 +46,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname,TableRouteResult tableRouteResult) private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname,TableRouteResult tableRouteResult)
{ {
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult); var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult);
if (StreamMergeContext.IsCrossTable)
DbContextQueryStore.TryAdd(tableRouteResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>)StreamMergeContext.GetReWriteQueryable() var newQueryable = (IQueryable<TEntity>)StreamMergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext); .ReplaceDbContextQueryable(shardingDbContext);
return newQueryable; return newQueryable;

View File

@ -60,8 +60,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname,IQueryable<TEntity> reverseOrderQueryable, TableRouteResult tableRouteResult) private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname,IQueryable<TEntity> reverseOrderQueryable, TableRouteResult tableRouteResult)
{ {
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult); var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult);
if (StreamMergeContext.IsCrossTable)
DbContextQueryStore.TryAdd(tableRouteResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>)reverseOrderQueryable var newQueryable = (IQueryable<TEntity>)reverseOrderQueryable
.ReplaceDbContextQueryable(shardingDbContext); .ReplaceDbContextQueryable(shardingDbContext);
return newQueryable; return newQueryable;

View File

@ -100,8 +100,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname,IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult) private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname,IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult)
{ {
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,sequenceResult.TableRouteResult); var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,sequenceResult.TableRouteResult);
if (StreamMergeContext.IsCrossTable)
DbContextQueryStore.TryAdd(sequenceResult.TableRouteResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take)) var newQueryable = (IQueryable<TEntity>)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take))
.ReplaceDbContextQueryable(shardingDbContext); .ReplaceDbContextQueryable(shardingDbContext);
return newQueryable; return newQueryable;

View File

@ -25,9 +25,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
{ {
var dataSourceName = StreamMergeContext.DataSourceRouteResult.IntersectDataSources.First(); var dataSourceName = StreamMergeContext.DataSourceRouteResult.IntersectDataSources.First();
var routeResult = StreamMergeContext.TableRouteResults.First(); var routeResult = StreamMergeContext.TableRouteResults.First();
var shardingDbContext = StreamMergeContext.CreateDbContext(dataSourceName, routeResult); var shardingDbContext = StreamMergeContext.CreateDbContext(dataSourceName,routeResult);
if (StreamMergeContext.IsCrossTable)
DbContextQueryStore.TryAdd(routeResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext); var newQueryable = (IQueryable<TEntity>) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext);
if (async) if (async)
{ {

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core.VirtualDatabase.VirtualTables; using ShardingCore.Core.VirtualDatabase.VirtualTables;
@ -35,6 +36,10 @@ namespace ShardingCore
{ {
return Services.GetService<T>(); return Services.GetService<T>();
} }
public static IEnumerable<T> GetServices<T>()
{
return Services.GetServices<T>();
}
public static object GetService(Type serviceType) public static object GetService(Type serviceType)
{ {
return Services.GetService(serviceType); return Services.GetService(serviceType);