From 017b3e0082c65087a7d35d6e16e4ce45a2c31f8f Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Fri, 24 Sep 2021 10:39:08 +0800 Subject: [PATCH] =?UTF-8?q?#11=20=E6=94=AF=E6=8C=81=E5=8D=95=E4=B8=AA?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E7=9A=84=E8=BF=BD=E8=B8=AA=20first=20firstor?= =?UTF-8?q?default=20last=20lastordefault=20single=20singleordefault....?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DefaultShardingQueryExecutor.cs | 16 ++-- .../AbstractInMemoryAsyncMergeEngine.cs | 2 +- ...MethodCallWhereInMemoryAsyncMergeEngine.cs | 94 +++++++++++++++++++ .../FirstAsyncInMemoryMergeEngine.cs | 6 +- .../FirstOrDefaultAsyncInMemoryMergeEngine.cs | 6 +- .../LastAsyncInMemoryMergeEngine.cs | 6 +- .../LastOrDefaultAsyncInMemoryMergeEngine.cs | 6 +- .../SingleAsyncInMemoryMergeEngine.cs | 6 +- ...SingleOrDefaultAsyncInMemoryMergeEngine.cs | 6 +- 9 files changed, 121 insertions(+), 27 deletions(-) create mode 100644 src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs index b69d5159..ded91357 100644 --- a/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs +++ b/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs @@ -79,17 +79,17 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors { case nameof(Enumerable.First): - return GenericMergeExecute(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + return GenericShardingDbContextMergeExecute(typeof(FirstAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.FirstOrDefault): - return GenericMergeExecute(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + return GenericShardingDbContextMergeExecute(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Last): - return GenericMergeExecute(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + return GenericShardingDbContextMergeExecute(typeof(LastAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.LastOrDefault): - return GenericMergeExecute(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + return GenericShardingDbContextMergeExecute(typeof(LastOrDefaultAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Single): - return GenericMergeExecute(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + return GenericShardingDbContextMergeExecute(typeof(SingleAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.SingleOrDefault): - return GenericMergeExecute(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + return GenericShardingDbContextMergeExecute(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Count): return EnsureMergeExecute(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.LongCount): @@ -144,11 +144,11 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors } - private TResult GenericMergeExecute(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) + private TResult GenericShardingDbContextMergeExecute(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) { var queryEntityType = query.GetQueryEntityType(); var resultEntityType = query.GetResultType(); - streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); + streamMergeEngineType = streamMergeEngineType.MakeGenericType(shardingDbContext.GetType(),queryEntityType); var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult); var streamEngineMethod = streamMergeEngineType.GetMethod(methodName); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs index 29a1e555..95a26391 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs @@ -58,7 +58,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions _mergeContext = ((IStreamMergeContextFactory)ShardingContainer.GetService(typeof(IStreamMergeContextFactory<>).GetGenericType0(shardingDbContext.GetType()))).Create(_queryable, shardingDbContext); - _parllelDbbContexts = new List(); + _parllelDbbContexts = new LinkedList(); } /// /// 合并queryable diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs new file mode 100644 index 00000000..5b03537f --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs @@ -0,0 +1,94 @@ +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using ShardingCore.Core.TrackerManagers; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Abstractions; +using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; + +namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/24 10:16:28 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public abstract class AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine : AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine where TShardingDbContext:DbContext,IShardingDbContext + { + private readonly ITrackerManager _trackerManager; + protected AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) + { + _trackerManager = ShardingContainer.GetService>(); + } + + private bool IsUseTrack => GetIsUseTracker(); + + private bool GetIsUseTracker() + { + if (GetStreamMergeContext().IsNoTracking.HasValue) + { + return !GetStreamMergeContext().IsNoTracking.Value; + } + else + { + return ((DbContext)GetStreamMergeContext().GetShardingDbContext()).ChangeTracker.QueryTrackingBehavior == + QueryTrackingBehavior.TrackAll; + } + } + public override TResult MergeResult() + { + var current = DoMergeResult(); + if (current != null) + { + if (IsUseTrack && _trackerManager.EntityUseTrack(current.GetType())) + { + var c = (object)current; + var genericDbContext = GetStreamMergeContext().GetShardingDbContext().CreateGenericDbContext(c); + var attachedEntity = genericDbContext.GetAttachedEntity(c); + if (attachedEntity == null) + genericDbContext.Attach(current); + else + { + return (TResult)attachedEntity; + } + } + + } + return current; + } + + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + { + cancellationToken.ThrowIfCancellationRequested(); + var current = await DoMergeResultAsync(cancellationToken); + if (current != null) + { + if (IsUseTrack && _trackerManager.EntityUseTrack(current.GetType())) + { + var c = (object)current; + var genericDbContext = GetStreamMergeContext().GetShardingDbContext().CreateGenericDbContext(c); + var attachedEntity = genericDbContext.GetAttachedEntity(c); + if (attachedEntity == null) + genericDbContext.Attach(current); + else + { + return (TResult)attachedEntity; + } + } + } + return current; + } + public abstract TResult DoMergeResult(); + + public abstract Task DoMergeResultAsync( + CancellationToken cancellationToken = new CancellationToken()); + + + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs index 58c1fe07..e57d26cd 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs @@ -21,18 +21,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class FirstAsyncInMemoryMergeEngine: AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine + public class FirstAsyncInMemoryMergeEngine: AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine where TShardingDbContext:DbContext,IShardingDbContext { public FirstAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { } - public override TResult MergeResult() + public override TResult DoMergeResult() { return AsyncHelper.RunSync(() => MergeResultAsync()); } - public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + public override async Task DoMergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).FirstAsync(cancellationToken), cancellationToken); var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs index 0908796f..b8255443 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs @@ -24,18 +24,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class FirstOrDefaultAsyncInMemoryMergeEngine : AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine + public class FirstOrDefaultAsyncInMemoryMergeEngine : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine where TShardingDbContext : DbContext, IShardingDbContext { public FirstOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { } - public override TResult MergeResult() + public override TResult DoMergeResult() { return AsyncHelper.RunSync(() => MergeResultAsync()); } - public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + public override async Task DoMergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken); var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs index 5dc604d1..237d64b3 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs @@ -21,18 +21,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class LastAsyncInMemoryMergeEngine: AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine + public class LastAsyncInMemoryMergeEngine : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine where TShardingDbContext : DbContext, IShardingDbContext { public LastAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { } - public override TResult MergeResult() + public override TResult DoMergeResult() { return AsyncHelper.RunSync(() => MergeResultAsync()); } - public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + public override async Task DoMergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).LastAsync(cancellationToken), cancellationToken); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs index e4daaa00..04a7a1a3 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs @@ -21,18 +21,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class LastOrDefaultAsyncInMemoryMergeEngine: AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine + public class LastOrDefaultAsyncInMemoryMergeEngine : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine where TShardingDbContext : DbContext, IShardingDbContext { public LastOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { } - public override TResult MergeResult() + public override TResult DoMergeResult() { return AsyncHelper.RunSync(() => MergeResultAsync()); } - public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + public override async Task DoMergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).LastOrDefaultAsync(cancellationToken), cancellationToken); var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs index 31671927..65a037d4 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs @@ -21,18 +21,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class SingleAsyncInMemoryMergeEngine: AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine + public class SingleAsyncInMemoryMergeEngine : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine where TShardingDbContext : DbContext, IShardingDbContext { public SingleAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { } - public override TResult MergeResult() + public override TResult DoMergeResult() { return AsyncHelper.RunSync(() => MergeResultAsync()); } - public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + public override async Task DoMergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).SingleAsync(cancellationToken), cancellationToken); var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs index f952b80c..e3d3da49 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs @@ -21,19 +21,19 @@ namespace ShardingCore.Sharding.StreamMergeEngines * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class SingleOrDefaultAsyncInMemoryMergeEngine: AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine + public class SingleOrDefaultAsyncInMemoryMergeEngine : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine where TShardingDbContext : DbContext, IShardingDbContext { public SingleOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { } - public override TResult MergeResult() + public override TResult DoMergeResult() { return AsyncHelper.RunSync(() => MergeResultAsync()); } - public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + public override async Task DoMergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).SingleOrDefaultAsync(cancellationToken), cancellationToken); var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();