From e9226e2773804ccd8f89ad0cdb50f1a98d9e7585 Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Thu, 30 Sep 2021 22:07:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96track=E7=9A=84=E5=88=A4?= =?UTF-8?q?=E6=96=AD,=E5=A6=82=E6=9E=9C=E4=B8=8D=E5=90=AF=E7=94=A8?= =?UTF-8?q?=E9=82=A3=E4=B9=88=E9=BB=98=E8=AE=A4=E4=B8=8D=E8=BF=9B=E8=A1=8C?= =?UTF-8?q?track,=E5=A6=82=E6=9E=9C=E9=9D=9E=E8=B7=A8=E8=A1=A8=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E6=88=96=E8=80=85=E9=9D=9E=E8=B7=A8=E5=BA=93=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E9=82=A3=E4=B9=88=E5=B0=86=E5=9C=A8=E5=90=AF=E7=94=A8?= =?UTF-8?q?track=E5=90=8E=E4=BF=9D=E8=AF=81=E5=92=8C=E5=8E=9F=E7=94=9F?= =?UTF-8?q?=E4=B8=80=E6=A0=B7,=E5=A6=82=E6=9E=9C=E8=B7=A8=E8=A1=A8?= =?UTF-8?q?=E6=88=96=E8=80=85=E8=B7=A8=E5=BA=93=E5=90=8E=E9=82=A3=E4=B9=88?= =?UTF-8?q?=E5=B0=86=E4=BD=BF=E7=94=A8sharding=E7=9A=84=E8=BF=BD=E8=B8=AA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 14 +- .../Core/TrackerManagers/ITrackerManager.cs | 5 +- .../Sharding/AbstractShardingDbContext.cs | 13 ++ .../Abstractions/IShardingDbContext.cs | 13 +- .../IShardingDbContextExecutor.cs | 10 +- .../Sharding/ActualConnectionStringManager.cs | 4 + .../DataSourceDbContext.cs | 179 ++++++++++++++++++ .../ShardingDbContextExecutor.cs | 37 ++-- .../Sharding/StreamMergeContext.cs | 101 +++++++++- .../AbstractInMemoryAsyncMergeEngine.cs | 3 - ...MethodCallWhereInMemoryAsyncMergeEngine.cs | 25 +-- .../AsyncEnumerableStreamMergeEngine.cs | 24 +-- .../AbstractEnumeratorStreamMergeEngine.cs | 10 +- ...equenceEnumeratorAsyncStreamMergeEngine.cs | 2 - ...hardingEnumeratorAsyncStreamMergeEngine.cs | 2 - ...hardingEnumeratorAsyncStreamMergeEngine.cs | 2 - ...equenceEnumeratorAsyncStreamMergeEngine.cs | 2 - ...leQueryEnumeratorAsyncStreamMergeEngine.cs | 4 +- src/ShardingCore/ShardingContainer.cs | 5 + 19 files changed, 351 insertions(+), 104 deletions(-) create mode 100644 src/ShardingCore/Sharding/ShardingDbContextExecutors/DataSourceDbContext.cs rename src/ShardingCore/Sharding/{ => ShardingDbContextExecutors}/ShardingDbContextExecutor.cs (94%) diff --git a/README.md b/README.md index 04f4a4c4..ff619d1d 100644 --- a/README.md +++ b/README.md @@ -578,16 +578,12 @@ var list = new List(); ``` ## 自动追踪 -默认shardingcore不支持单次查询跨表自动追踪,并且也不建议使用自动追踪,如果你有需要shardingcore也默认提供了自动追踪功能 +默认shardingcore不支持自动追踪,并且也不建议使用自动追踪,如果你有需要shardingcore也默认提供了自动追踪功能 有两点需要注意 - -1.如果本次查询不涉及跨表那么支持(跨库也可以) - -2.如果设计跨表那么仅支持dbcontext的model的类型的整个查询匿名类型不支持联级查询不支持 - -3.不跨表的情况下和efcore的自动追踪一样 - -3.不跨表的情况下tolist等操作会查询数据库返回的时候判断是否已经追踪如果已经追踪则返回缓存里已经追踪了的值 +目前仅支持单主键对象 +1.shardingcore仅支持dbcontext的model的类型的整个查询匿名类型不支持联级查询不支持 +2.shardingcore的单个查询依然走数据库不走缓存如果查询出来的结果缓存里面有就返回缓存里面的而不是数据库的 +3.tolist等操作会查询数据库返回的时候判断是否已经追踪如果已经追踪则返回缓存里已经追踪了的值 4.支持 `first`,`firstordefault`,`last`,`lastordefault`,`single`,`singleordefault` 如何开启 ```c# diff --git a/src/ShardingCore/Core/TrackerManagers/ITrackerManager.cs b/src/ShardingCore/Core/TrackerManagers/ITrackerManager.cs index 5050d077..a458abea 100644 --- a/src/ShardingCore/Core/TrackerManagers/ITrackerManager.cs +++ b/src/ShardingCore/Core/TrackerManagers/ITrackerManager.cs @@ -13,9 +13,12 @@ namespace ShardingCore.Core.TrackerManagers * @Ver: 1.0 * @Email: 326308290@qq.com */ - public interface ITrackerManager where TShardingDbContext:DbContext,IShardingDbContext + public interface ITrackerManager { bool AddDbContextModel(Type entityType); bool EntityUseTrack(Type entityType); } + public interface ITrackerManager: ITrackerManager where TShardingDbContext:DbContext,IShardingDbContext + { + } } diff --git a/src/ShardingCore/Sharding/AbstractShardingDbContext.cs b/src/ShardingCore/Sharding/AbstractShardingDbContext.cs index 10b94dc2..5212798d 100644 --- a/src/ShardingCore/Sharding/AbstractShardingDbContext.cs +++ b/src/ShardingCore/Sharding/AbstractShardingDbContext.cs @@ -24,6 +24,7 @@ using ShardingCore.Core.VirtualDatabase.VirtualDataSources; using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources; using ShardingCore.Core.VirtualDatabase.VirtualTables; using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; +using ShardingCore.Sharding.ShardingDbContextExecutors; using ShardingCore.Sharding.ShardingTransactions; namespace ShardingCore.Sharding @@ -85,7 +86,19 @@ namespace ShardingCore.Sharding { return _shardingDbContextExecutor.CreateGenericDbContext(entity); } + /// + /// 是否启用了读写分离 + /// + /// + public bool IsUseReadWriteSeparation() + { + return _shardingDbContextExecutor.IsUseReadWriteSeparation(); + } + public bool EnableAutoTrack() + { + return _shardingDbContextExecutor.EnableAutoTrack(); + } public override EntityEntry Add(object entity) diff --git a/src/ShardingCore/Sharding/Abstractions/IShardingDbContext.cs b/src/ShardingCore/Sharding/Abstractions/IShardingDbContext.cs index 51dcb95d..52b11b9d 100644 --- a/src/ShardingCore/Sharding/Abstractions/IShardingDbContext.cs +++ b/src/ShardingCore/Sharding/Abstractions/IShardingDbContext.cs @@ -1,10 +1,6 @@ using Microsoft.EntityFrameworkCore; -using System; -using System.Collections.Generic; -using System.Data.Common; -using System.Linq.Expressions; -using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources; using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; +using System; namespace ShardingCore.Sharding.Abstractions { @@ -35,8 +31,13 @@ namespace ShardingCore.Sharding.Abstractions /// /// DbContext CreateGenericDbContext(T entity) where T : class; + /// + /// 是否启用了读写分离 + /// + /// + bool IsUseReadWriteSeparation(); - + bool EnableAutoTrack(); } diff --git a/src/ShardingCore/Sharding/Abstractions/IShardingDbContextExecutor.cs b/src/ShardingCore/Sharding/Abstractions/IShardingDbContextExecutor.cs index ea77ce96..13fa7934 100644 --- a/src/ShardingCore/Sharding/Abstractions/IShardingDbContextExecutor.cs +++ b/src/ShardingCore/Sharding/Abstractions/IShardingDbContextExecutor.cs @@ -34,9 +34,17 @@ namespace ShardingCore.Sharding.Abstractions /// int ReadWriteSeparationPriority { get; set; } /// - /// 是否开启读写分离 + /// 当前是否开启读写分离 /// bool ReadWriteSeparation { get; set; } + + /// + /// 是否使用了读写分离 + /// + /// + bool IsUseReadWriteSeparation(); + + bool EnableAutoTrack(); /// /// create sharding db context options /// diff --git a/src/ShardingCore/Sharding/ActualConnectionStringManager.cs b/src/ShardingCore/Sharding/ActualConnectionStringManager.cs index 72bbf6a2..808b446e 100644 --- a/src/ShardingCore/Sharding/ActualConnectionStringManager.cs +++ b/src/ShardingCore/Sharding/ActualConnectionStringManager.cs @@ -35,6 +35,10 @@ namespace ShardingCore.Sharding _useReadWriteSeparation = _connectionStringManager is ReadWriteConnectionStringManager; } + public bool IsUseReadWriteSeparation() + { + return _useReadWriteSeparation; + } public string GetConnectionString(string dataSourceName, bool isWrite) { if (isWrite) diff --git a/src/ShardingCore/Sharding/ShardingDbContextExecutors/DataSourceDbContext.cs b/src/ShardingCore/Sharding/ShardingDbContextExecutors/DataSourceDbContext.cs new file mode 100644 index 00000000..cdf14f9c --- /dev/null +++ b/src/ShardingCore/Sharding/ShardingDbContextExecutors/DataSourceDbContext.cs @@ -0,0 +1,179 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Data; +using System.Data.Common; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; +using ShardingCore.DbContexts; +using ShardingCore.DbContexts.ShardingDbContexts; +using ShardingCore.Sharding.Abstractions; +using ShardingCore.Sharding.ShardingTransactions; + +namespace ShardingCore.Sharding.ShardingDbContextExecutors +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/30 10:53:23 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class DataSourceDbContext : IDisposable +#if !EFCORE2 + , IAsyncDisposable +#endif + where TShardingDbContext : DbContext, IShardingDbContext + { + /// + /// 数据源名称 + /// + public string DataSourceName { get; } + private readonly IShardingDbContextFactory _shardingDbContextFactory; + + private ConcurrentDictionary _dataSourceDbContexts = + new ConcurrentDictionary(); + + private IDbContextTransaction _dbContextTransaction; + private IsolationLevel isolationLevel = IsolationLevel.Unspecified; + + private bool _isBeginTransaction; + + + /// + /// + /// + /// + /// + /// + public DataSourceDbContext(string dataSourceName, IShardingDbContextFactory shardingDbContextFactory, bool isBeginTransaction) + { + DataSourceName = dataSourceName; + _shardingDbContextFactory = shardingDbContextFactory; + _isBeginTransaction = isBeginTransaction; + } + public void BeginTransaction(IsolationLevel isolationLevel = IsolationLevel.Unspecified) + { + if (_isBeginTransaction) + throw new InvalidOperationException("transaction is already begin"); + _isBeginTransaction = true; + this.isolationLevel = isolationLevel; + } + public bool IsEmpty() + { + return !_dataSourceDbContexts.Any(); + } + + public DbContext TryGetOrCreateDbContext(IRouteTail routeTail, ShardingDbContextOptions shardingDbContextOptions) + { + var cacheKey = routeTail.GetRouteTailIdentity(); + + if (!_dataSourceDbContexts.TryGetValue(cacheKey, out var dbContext)) + { + dbContext = _shardingDbContextFactory.Create(shardingDbContextOptions); + if (_isBeginTransaction) + { + if (_dbContextTransaction == null) + { + _dbContextTransaction = dbContext.Database.BeginTransaction(isolationLevel); + } + UseTransaction(_dbContextTransaction); + } + _dataSourceDbContexts.TryAdd(cacheKey, dbContext); + } + return dbContext; + } + + public DbConnection GetDbConnection() + { + return _dataSourceDbContexts.First().Value.Database.GetDbConnection(); + } + + public void UseTransaction(IDbContextTransaction dbContextTransaction) + { + if (dbContextTransaction == null) + { + foreach (var dataSourceDbContext in _dataSourceDbContexts) + { + if (dataSourceDbContext.Value.Database.CurrentTransaction != null) + dataSourceDbContext.Value.Database.UseTransaction(null); + } + } + else + { + foreach (var dataSourceDbContext in _dataSourceDbContexts) + { + if (dataSourceDbContext.Value.Database.CurrentTransaction == null) + dataSourceDbContext.Value.Database.UseTransaction(dbContextTransaction.GetDbTransaction()); + } + } + } + public async Task UseTransactionAsync(IDbContextTransaction dbContextTransaction, CancellationToken cancellationToken = new CancellationToken()) + { + cancellationToken.ThrowIfCancellationRequested(); + if (dbContextTransaction == null) + { + foreach (var dataSourceDbContext in _dataSourceDbContexts) + { + if (dataSourceDbContext.Value.Database.CurrentTransaction != null) + await dataSourceDbContext.Value.Database.UseTransactionAsync(null, cancellationToken); + } + } + else + { + foreach (var dataSourceDbContext in _dataSourceDbContexts) + { + if (dataSourceDbContext.Value.Database.CurrentTransaction == null) + await dataSourceDbContext.Value.Database.UseTransactionAsync(dbContextTransaction.GetDbTransaction(), cancellationToken); + } + } + } + /// + /// 提交 + /// + /// + /// + public int SaveChanges(bool acceptAllChangesOnSuccess) + { + int i = 0; + foreach (var dataSourceDbContext in _dataSourceDbContexts) + { + i += dataSourceDbContext.Value.SaveChanges(acceptAllChangesOnSuccess); + } + + return i; + } + public async Task SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken()) + { + + int i = 0; + foreach (var dataSourceDbContext in _dataSourceDbContexts) + { + i += await dataSourceDbContext.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken); + } + + return i; + } + + public void Dispose() + { + foreach (var dataSourceDbContext in _dataSourceDbContexts) + { + dataSourceDbContext.Value.Dispose(); + } + } + + public async ValueTask DisposeAsync() + { + foreach (var dataSourceDbContext in _dataSourceDbContexts) + { + await dataSourceDbContext.Value.DisposeAsync(); + } + } + } +} diff --git a/src/ShardingCore/Sharding/ShardingDbContextExecutor.cs b/src/ShardingCore/Sharding/ShardingDbContextExecutors/ShardingDbContextExecutor.cs similarity index 94% rename from src/ShardingCore/Sharding/ShardingDbContextExecutor.cs rename to src/ShardingCore/Sharding/ShardingDbContextExecutors/ShardingDbContextExecutor.cs index d61cbbe1..321a6147 100644 --- a/src/ShardingCore/Sharding/ShardingDbContextExecutor.cs +++ b/src/ShardingCore/Sharding/ShardingDbContextExecutors/ShardingDbContextExecutor.cs @@ -1,7 +1,13 @@ -using Microsoft.EntityFrameworkCore; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; using ShardingCore.Core.VirtualDatabase.VirtualDataSources; using ShardingCore.Core.VirtualDatabase.VirtualTables; -using ShardingCore.Core.VirtualRoutes.TableRoutes; using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; using ShardingCore.DbContexts; using ShardingCore.DbContexts.ShardingDbContexts; @@ -9,16 +15,8 @@ using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.ShardingTransactions; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Data; -using System.Linq; -using System.Linq.Expressions; -using System.Threading; -using System.Threading.Tasks; -namespace ShardingCore.Sharding +namespace ShardingCore.Sharding.ShardingDbContextExecutors { /* * @Author: xjm @@ -40,6 +38,8 @@ namespace ShardingCore.Sharding private readonly IShardingDbContextFactory _shardingDbContextFactory; private readonly IShardingDbContextOptionsBuilderConfig _shardingDbContextOptionsBuilderConfig; private readonly IRouteTailFactory _routeTailFactory; + private readonly ActualConnectionStringManager _actualConnectionStringManager; + private readonly IShardingConfigOption _shardingConfigOption; public int ReadWriteSeparationPriority { @@ -55,7 +55,6 @@ namespace ShardingCore.Sharding public bool IsBeginTransaction => CurrentShardingTransaction != null && CurrentShardingTransaction.IsBeginTransaction(); - private readonly ActualConnectionStringManager _actualConnectionStringManager; public ShardingDbContextExecutor() { @@ -66,6 +65,8 @@ namespace ShardingCore.Sharding _routeTailFactory = ShardingContainer.GetService(); _actualConnectionStringManager = new ActualConnectionStringManager(); + + _shardingConfigOption = ShardingContainer.GetServices().FirstOrDefault(o => o.ShardingDbContextType == typeof(TShardingDbContext)); } #region create db context @@ -118,6 +119,17 @@ namespace ShardingCore.Sharding { return new ShardingDbContextOptions(CreateParallelDbContextOptions(dataSourceName), routeTail); } + + public bool IsUseReadWriteSeparation() + { + return _actualConnectionStringManager.IsUseReadWriteSeparation(); + } + + public bool EnableAutoTrack() + { + return _shardingConfigOption.AutoTrackEntity; + } + public DbContext CreateDbContext(bool parallelQuery, string dataSourceName, IRouteTail routeTail) { @@ -130,6 +142,7 @@ namespace ShardingCore.Sharding if (!_dbContextCaches.TryGetValue(dataSourceName, out var tailDbContexts)) { tailDbContexts = new ConcurrentDictionary(); + _dbContextCaches.TryAdd(dataSourceName, tailDbContexts); } var cacheKey = routeTail.GetRouteTailIdentity(); if (!tailDbContexts.TryGetValue(cacheKey, out var dbContext)) diff --git a/src/ShardingCore/Sharding/StreamMergeContext.cs b/src/ShardingCore/Sharding/StreamMergeContext.cs index f0d96852..69e852dc 100644 --- a/src/ShardingCore/Sharding/StreamMergeContext.cs +++ b/src/ShardingCore/Sharding/StreamMergeContext.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using Microsoft.EntityFrameworkCore; using ShardingCore.Core.Internal.StreamMerge.ReWrite; using ShardingCore.Core.Internal.Visitors; @@ -8,6 +9,8 @@ using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; using ShardingCore.Sharding.Abstractions; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; +using ShardingCore.Core.TrackerManagers; using ShardingCore.Core.VirtualDatabase.VirtualTables; using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine; using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; @@ -22,7 +25,10 @@ namespace ShardingCore.Sharding * @Date: Monday, 25 January 2021 11:38:27 * @Email: 326308290@qq.com */ - public class StreamMergeContext + public class StreamMergeContext:IDisposable +#if !EFCORE2 + ,IAsyncDisposable +#endif { //private readonly IShardingScopeFactory _shardingScopeFactory; private readonly IQueryable _source; @@ -35,8 +41,8 @@ namespace ShardingCore.Sharding public int? Skip { get; private set; } public int? Take { get; } public IEnumerable Orders { get; private set; } - - public SelectContext SelectContext { get;} + + public SelectContext SelectContext { get; } public GroupByContext GroupByContext { get; } public IEnumerable TableRouteResults { get; } public DataSourceRouteResult DataSourceRouteResult { get; } @@ -57,7 +63,11 @@ namespace ShardingCore.Sharding /// public bool IsCrossTable { get; } - public StreamMergeContext(IQueryable source,IShardingDbContext shardingDbContext, + private readonly ITrackerManager _trackerManager; + + private readonly ConcurrentDictionary _parallelDbContexts; + + public StreamMergeContext(IQueryable source, IShardingDbContext shardingDbContext, DataSourceRouteResult dataSourceRouteResult, IEnumerable tableRouteResults, IRouteTailFactory routeTailFactory) @@ -76,9 +86,13 @@ namespace ShardingCore.Sharding _reWriteSource = reWriteResult.ReWriteQueryable; QueryEntities = source.ParseQueryableRoute(); DataSourceRouteResult = dataSourceRouteResult; - TableRouteResults= tableRouteResults; + TableRouteResults = tableRouteResults; IsCrossDataSource = dataSourceRouteResult.IntersectDataSources.Count > 1; - IsCrossTable=tableRouteResults.Count() > 1; + IsCrossTable = tableRouteResults.Count() > 1; + _trackerManager = + (ITrackerManager)ShardingContainer.GetService( + typeof(ITrackerManager<>).GetGenericType0(shardingDbContext.GetType())); + _parallelDbContexts = new ConcurrentDictionary(); //RouteResults = _tableTableRouteRuleEngineFactory.Route(_shardingDbContext.ShardingDbContextType, _source); } //public StreamMergeContext(IQueryable source,IEnumerable routeResults, @@ -114,7 +128,14 @@ namespace ShardingCore.Sharding public DbContext CreateDbContext(string dataSourceName, TableRouteResult tableRouteResult) { var routeTail = _routeTailFactory.Create(tableRouteResult); - return _shardingDbContext.GetDbContext(dataSourceName, IsCrossTable, routeTail); + //˶д߱βѯǿ߿ıʾβѯdbcontextDz洢ֱdispose + var parallelQuery = IsParallelQuery(); + var dbContext = _shardingDbContext.GetDbContext(dataSourceName, parallelQuery, routeTail); + if (parallelQuery) + { + _parallelDbContexts.TryAdd(dbContext, null); + } + return dbContext; } public IRouteTail Create(TableRouteResult tableRouteResult) @@ -140,7 +161,7 @@ namespace ShardingCore.Sharding { return Skip.GetValueOrDefault() > 0 || Take.GetValueOrDefault() > 0; } - + public bool HasGroupQuery() { @@ -156,6 +177,70 @@ namespace ShardingCore.Sharding { return _shardingDbContext; } + /// + /// ǷǿԴѯ + /// + /// + private bool IsCrossQuery() + { + return IsCrossDataSource || IsCrossTable; + } + private bool IsUseReadWriteSeparation() + { + return _shardingDbContext.IsUseReadWriteSeparation(); + } + + /// + /// Ƿʹòвѯ + /// + /// + private bool IsParallelQuery() + { + return !_shardingDbContext.EnableAutoTrack()|| IsCrossQuery() || IsUseReadWriteSeparation(); + } + + /// + /// Ƿʹsharding track + /// + /// + public bool IsUseShardingTrack(Type entityType) + { + //ûпdbcontextѯҲǶдſôǷ׷֮ɲѯdbcontextд + if (!IsParallelQuery()) + return false; + return QueryTrack() && _trackerManager.EntityUseTrack(entityType); + } + private bool QueryTrack() + { + + if (IsNoTracking.HasValue) + { + return !IsNoTracking.Value; + } + else + { + return ((DbContext)_shardingDbContext).ChangeTracker.QueryTrackingBehavior == + QueryTrackingBehavior.TrackAll; + } + } + + public void Dispose() + { + foreach (var dbContext in _parallelDbContexts.Keys) + { + dbContext.Dispose(); + } + } +#if !EFCORE2 + + public async ValueTask DisposeAsync() + { + foreach (var dbContext in _parallelDbContexts.Keys) + { + await dbContext.DisposeAsync(); + } + } +#endif } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs index 95a26391..f1ca7090 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs @@ -25,7 +25,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions private readonly StreamMergeContext _mergeContext; private readonly IQueryable _queryable; private readonly Expression _secondExpression; - private readonly ICollection _parllelDbbContexts; public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) { @@ -58,7 +57,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions _mergeContext = ((IStreamMergeContextFactory)ShardingContainer.GetService(typeof(IStreamMergeContextFactory<>).GetGenericType0(shardingDbContext.GetType()))).Create(_queryable, shardingDbContext); - _parllelDbbContexts = new LinkedList(); } /// /// 合并queryable @@ -71,7 +69,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions private IQueryable CreateAsyncExecuteQueryable(string dsname,TableRouteResult tableRouteResult) { var shardingDbContext = _mergeContext.CreateDbContext(dsname,tableRouteResult); - _parllelDbbContexts.Add(shardingDbContext); var newQueryable = (IQueryable) GetStreamMergeContext().GetReWriteQueryable() .ReplaceDbContextQueryable(shardingDbContext); var newCombineQueryable= DoCombineQueryable(newQueryable); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs index 18f8fd74..2e4a7c90 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs @@ -21,36 +21,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions */ public abstract class AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine : AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine where TShardingDbContext:DbContext,IShardingDbContext { - private readonly ITrackerManager _trackerManager; protected AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { - _trackerManager = ShardingContainer.GetService>(); - } - /// - /// 手动追踪 - /// - private bool IsUseManualTrack => GetIsUseManualTrack(); - - private bool GetIsUseManualTrack() - { - if (!GetStreamMergeContext().IsCrossTable) - return false; - if (GetStreamMergeContext().IsNoTracking.HasValue) - { - return !GetStreamMergeContext().IsNoTracking.Value; - } - else - { - return ((DbContext)GetStreamMergeContext().GetShardingDbContext()).ChangeTracker.QueryTrackingBehavior == - QueryTrackingBehavior.TrackAll; - } } public override TResult MergeResult() { var current = DoMergeResult(); if (current != null) { - if (IsUseManualTrack && _trackerManager.EntityUseTrack(current.GetType())) + if (GetStreamMergeContext().IsUseShardingTrack(current.GetType())) { var c = (object)current; var genericDbContext = GetStreamMergeContext().GetShardingDbContext().CreateGenericDbContext(c); @@ -73,7 +52,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions var current = await DoMergeResultAsync(cancellationToken); if (current != null) { - if (IsUseManualTrack && _trackerManager.EntityUseTrack(current.GetType())) + if (GetStreamMergeContext().IsUseShardingTrack(current.GetType())) { var c = (object)current; var genericDbContext = GetStreamMergeContext().GetShardingDbContext().CreateGenericDbContext(c); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs index cc8bff1f..000a571c 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs @@ -19,30 +19,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines where TShardingDbContext:DbContext,IShardingDbContext { private readonly StreamMergeContext _mergeContext; - private readonly ITrackerManager _trackerManager; public AsyncEnumerableStreamMergeEngine(StreamMergeContext mergeContext) { _mergeContext = mergeContext; - _trackerManager = ShardingContainer.GetService>(); } - private bool IsUseManualTrack => GetIsUseManualTrack(); - - private bool GetIsUseManualTrack() - { - if (!_mergeContext.IsCrossTable) - return false; - if (_mergeContext.IsNoTracking.HasValue) - { - return !_mergeContext.IsNoTracking.Value; - } - else - { - return ((DbContext) _mergeContext.GetShardingDbContext()).ChangeTracker.QueryTrackingBehavior == - QueryTrackingBehavior.TrackAll; - } - } #if !EFCORE2 public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) @@ -50,7 +32,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines var asyncEnumerator = new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync(cancellationToken) .GetAsyncEnumerator(cancellationToken); - if (IsUseManualTrack&&_trackerManager.EntityUseTrack(typeof(T))) + if (_mergeContext.IsUseShardingTrack(typeof(T))) { return new AsyncTrackerEnumerator(_mergeContext, asyncEnumerator); } @@ -64,7 +46,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines { var asyncEnumerator = ((IAsyncEnumerable)new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync()) .GetEnumerator(); - if (IsUseManualTrack&&_trackerManager.EntityUseTrack(typeof(T))) + if (_mergeContext.IsUseShardingTrack(typeof(T))) { return new AsyncTrackerEnumerator(_mergeContext, asyncEnumerator); } @@ -78,7 +60,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines var enumerator = ((IEnumerable)new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync()) .GetEnumerator(); - if (IsUseManualTrack&&_trackerManager.EntityUseTrack(typeof(T))) + if (_mergeContext.IsUseShardingTrack(typeof(T))) { return new TrackerEnumerator(_mergeContext, enumerator); } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs index 657ec7c5..e38a7fcb 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs @@ -22,13 +22,11 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. public abstract class AbstractEnumeratorStreamMergeEngine : IEnumeratorStreamMergeEngine { public StreamMergeContext StreamMergeContext { get; } - public ConcurrentDictionary DbContextQueryStore { get; } public AbstractEnumeratorStreamMergeEngine(StreamMergeContext streamMergeContext) { StreamMergeContext = streamMergeContext; - DbContextQueryStore = new ConcurrentDictionary(); } public abstract IStreamMergeAsyncEnumerator GetShardingAsyncEnumerator(bool async, @@ -62,13 +60,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. public void Dispose() { - if (DbContextQueryStore.IsNotEmpty()) - { - DbContextQueryStore.Values.ForEach(dbContext => - { - dbContext.Dispose(); - }); - } + StreamMergeContext.Dispose(); } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs index 3ca94138..6ced731e 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs @@ -114,8 +114,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. private IQueryable CreateAsyncExecuteQueryable(string dsname, IQueryable noPaginationQueryable, SequenceResult sequenceResult, IEnumerable reSetOrders) { var shardingDbContext = StreamMergeContext.CreateDbContext(dsname, sequenceResult.TableRouteResult); - if (StreamMergeContext.IsCrossTable) - DbContextQueryStore.TryAdd(sequenceResult.TableRouteResult, shardingDbContext); var newQueryable = (IQueryable)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take).OrderWithExpression(reSetOrders)) .ReplaceDbContextQueryable(shardingDbContext); return newQueryable; diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs index b7af2e4d..7024b115 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs @@ -46,8 +46,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. private IQueryable CreateAsyncExecuteQueryable(string dsname,TableRouteResult tableRouteResult) { var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult); - if (StreamMergeContext.IsCrossTable) - DbContextQueryStore.TryAdd(tableRouteResult, shardingDbContext); var newQueryable = (IQueryable)StreamMergeContext.GetReWriteQueryable() .ReplaceDbContextQueryable(shardingDbContext); return newQueryable; diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs index 563f3ebd..c7140f6e 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs @@ -60,8 +60,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. private IQueryable CreateAsyncExecuteQueryable(string dsname,IQueryable reverseOrderQueryable, TableRouteResult tableRouteResult) { var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult); - if (StreamMergeContext.IsCrossTable) - DbContextQueryStore.TryAdd(tableRouteResult, shardingDbContext); var newQueryable = (IQueryable)reverseOrderQueryable .ReplaceDbContextQueryable(shardingDbContext); return newQueryable; diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs index 49209708..8212af7b 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs @@ -100,8 +100,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. private IQueryable CreateAsyncExecuteQueryable(string dsname,IQueryable noPaginationQueryable, SequenceResult sequenceResult) { var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,sequenceResult.TableRouteResult); - if (StreamMergeContext.IsCrossTable) - DbContextQueryStore.TryAdd(sequenceResult.TableRouteResult, shardingDbContext); var newQueryable = (IQueryable)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take)) .ReplaceDbContextQueryable(shardingDbContext); return newQueryable; diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs index 85a6aa94..ae77db09 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs @@ -25,9 +25,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. { var dataSourceName = StreamMergeContext.DataSourceRouteResult.IntersectDataSources.First(); var routeResult = StreamMergeContext.TableRouteResults.First(); - var shardingDbContext = StreamMergeContext.CreateDbContext(dataSourceName, routeResult); - if (StreamMergeContext.IsCrossTable) - DbContextQueryStore.TryAdd(routeResult, shardingDbContext); + var shardingDbContext = StreamMergeContext.CreateDbContext(dataSourceName,routeResult); var newQueryable = (IQueryable) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext); if (async) { diff --git a/src/ShardingCore/ShardingContainer.cs b/src/ShardingCore/ShardingContainer.cs index 20297d3e..5478d6c8 100644 --- a/src/ShardingCore/ShardingContainer.cs +++ b/src/ShardingCore/ShardingContainer.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using ShardingCore.Core.VirtualDatabase.VirtualTables; @@ -35,6 +36,10 @@ namespace ShardingCore { return Services.GetService(); } + public static IEnumerable GetServices() + { + return Services.GetServices(); + } public static object GetService(Type serviceType) { return Services.GetService(serviceType);