From 74c6f7f474a83fcb642397a70ff24fa577433b74 Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Sun, 3 Oct 2021 14:09:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=91=BD=E5=90=8D=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84=E6=95=B4=E7=90=86?= =?UTF-8?q?=E6=A1=86=E6=9E=B6=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- samples/Sample.BulkConsole/Program.cs | 1 + .../Controllers/ValuesController.cs | 1 + .../EFCores/ShardingQueryCompiler.cs | 3 + .../ShardingPageExtension.cs} | 10 +- .../ShardingPagedResult.cs | 2 +- .../Sharding/AbstractShardingDbContext.cs | 29 +--- .../Abstractions/AbstractBaseMergeEngine.cs | 17 ++ ...hodCallConstantInMemoryAsyncMergeEngine.cs | 7 +- ...nsureMethodCallInMemoryAsyncMergeEngine.cs | 8 +- ...hodCallSelectorInMemoryAsyncMergeEngine.cs | 3 +- ...MethodCallWhereInMemoryAsyncMergeEngine.cs | 7 +- ...nericMethodCallInMemoryAsyncMergeEngine.cs | 8 +- ...hodCallSelectorInMemoryAsyncMergeEngine.cs | 7 +- ...MethodCallWhereInMemoryAsyncMergeEngine.cs | 7 +- ...MethodCallWhereInMemoryAsyncMergeEngine.cs | 9 +- .../AbstractInMemoryAsyncMergeEngine.cs | 39 ++--- .../InMemoryMerge}/IEnsureMergeResult.cs | 7 +- .../InMemoryMerge}/IGenericMergeResult.cs | 7 +- .../IInMemoryAsyncMergeEngine.cs | 4 +- .../AbstractEnumeratorStreamMergeEngine.cs | 149 ++++++++++++++++++ .../IEnumeratorStreamMergeEngine.cs | 2 +- .../AverageAsyncInMemoryMergeEngine.cs | 3 +- .../MaxAsyncInMemoryMergeEngine.cs | 3 +- .../MinAsyncInMemoryMergeEngine.cs | 3 +- .../SumAsyncInMemoryMergeEngine.cs | 3 +- .../AllAsyncInMemoryMergeEngine.cs | 3 +- .../AnyAsyncInMemoryMergeEngine.cs | 3 +- .../ContainsAsyncInMemoryMergeEngine.cs | 3 +- .../CountAsyncInMemoryMergeEngine.cs | 2 +- .../AsyncEnumeratorStreamMergeEngine.cs} | 16 +- .../Base/SequencePaginationList.cs | 0 ...equenceEnumeratorAsyncStreamMergeEngine.cs | 11 +- ...hardingEnumeratorAsyncStreamMergeEngine.cs | 15 +- ...hardingEnumeratorAsyncStreamMergeEngine.cs | 12 +- ...equenceEnumeratorAsyncStreamMergeEngine.cs | 12 +- ...leQueryEnumeratorAsyncStreamMergeEngine.cs | 14 +- .../EnumeratorStreamMergeEngineFactory.cs} | 23 +-- .../FirstAsyncInMemoryMergeEngine.cs | 3 +- .../FirstOrDefaultAsyncInMemoryMergeEngine.cs | 3 +- .../LastAsyncInMemoryMergeEngine.cs | 3 +- .../LastOrDefaultAsyncInMemoryMergeEngine.cs | 3 +- .../LongCountAsyncInMemoryMergeEngine.cs | 3 +- .../RouteQueryResult.cs | 0 .../SingleAsyncInMemoryMergeEngine.cs | 3 +- ...SingleOrDefaultAsyncInMemoryMergeEngine.cs | 3 +- .../AsyncTrackerEnumerator.cs | 0 .../TrackerEnumerators/TrackerEnumerator.cs | 0 .../DefaultShardingQueryExecutor.cs | 13 +- ...bstractEnumeratorAsyncStreamMergeEngine.cs | 100 ------------ .../AbstractEnumeratorStreamMergeEngine.cs | 67 -------- 50 files changed, 300 insertions(+), 354 deletions(-) rename src/ShardingCore/Extensions/{ShardingQueryableExtension.cs => ShardingPageExtensions/ShardingPageExtension.cs} (91%) rename src/ShardingCore/{ => Extensions/ShardingPageExtensions}/ShardingPagedResult.cs (94%) create mode 100644 src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines => MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines}/AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine.cs (79%) rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions => MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines}/AbstractEnsureMethodCallInMemoryAsyncMergeEngine.cs (77%) rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines => MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines}/AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine.cs (91%) rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines => MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines}/AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine.cs (74%) rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions => MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines}/AbstractGenericMethodCallInMemoryAsyncMergeEngine.cs (78%) rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines => MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines}/AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine.cs (85%) rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines => MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines}/AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine.cs (75%) rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions => MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines}/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs (90%) rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions => MergeEngines/Abstractions/InMemoryMerge}/AbstractInMemoryAsyncMergeEngine.cs (78%) rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions => MergeEngines/Abstractions/InMemoryMerge}/IEnsureMergeResult.cs (82%) rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions => MergeEngines/Abstractions/InMemoryMerge}/IGenericMergeResult.cs (83%) rename src/ShardingCore/Sharding/{StreamMergeEngines/Abstractions => MergeEngines/Abstractions/InMemoryMerge}/IInMemoryAsyncMergeEngine.cs (82%) create mode 100644 src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/AbstractEnumeratorStreamMergeEngine.cs rename src/ShardingCore/Sharding/{StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions => MergeEngines/Abstractions/StreamMerge}/IEnumeratorStreamMergeEngine.cs (77%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs (97%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs (89%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs (89%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs (97%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/AllAsyncInMemoryMergeEngine.cs (89%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/AnyAsyncInMemoryMergeEngine.cs (89%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/ContainsAsyncInMemoryMergeEngine.cs (88%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/CountAsyncInMemoryMergeEngine.cs (94%) rename src/ShardingCore/Sharding/{StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs => MergeEngines/EnumeratorStreamMergeEngines/AsyncEnumeratorStreamMergeEngine.cs} (72%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/EnumeratorStreamMergeEngines/Base/SequencePaginationList.cs (100%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs (91%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs (77%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs (87%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs (90%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs (73%) rename src/ShardingCore/Sharding/{ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs => MergeEngines/EnumeratorStreamMergeEngines/EnumeratorStreamMergeEngineFactory.cs} (93%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/FirstAsyncInMemoryMergeEngine.cs (91%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/FirstOrDefaultAsyncInMemoryMergeEngine.cs (92%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/LastAsyncInMemoryMergeEngine.cs (91%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/LastOrDefaultAsyncInMemoryMergeEngine.cs (91%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/LongCountAsyncInMemoryMergeEngine.cs (92%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/RouteQueryResult.cs (100%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/SingleAsyncInMemoryMergeEngine.cs (91%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/SingleOrDefaultAsyncInMemoryMergeEngine.cs (91%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/TrackerEnumerators/AsyncTrackerEnumerator.cs (100%) rename src/ShardingCore/Sharding/{StreamMergeEngines => MergeEngines}/TrackerEnumerators/TrackerEnumerator.cs (100%) delete mode 100644 src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs delete mode 100644 src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs diff --git a/samples/Sample.BulkConsole/Program.cs b/samples/Sample.BulkConsole/Program.cs index 1402d0cf..25a91faf 100644 --- a/samples/Sample.BulkConsole/Program.cs +++ b/samples/Sample.BulkConsole/Program.cs @@ -10,6 +10,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using ShardingCore.Extensions.ShardingPageExtensions; namespace Sample.BulkConsole { diff --git a/samples/Sample.SqlServer/Controllers/ValuesController.cs b/samples/Sample.SqlServer/Controllers/ValuesController.cs index c0960ab7..b3261eda 100644 --- a/samples/Sample.SqlServer/Controllers/ValuesController.cs +++ b/samples/Sample.SqlServer/Controllers/ValuesController.cs @@ -13,6 +13,7 @@ using Sample.SqlServer.Domain.Entities; using ShardingCore.Core.QueryRouteManagers.Abstractions; using ShardingCore.DbContexts.VirtualDbContexts; using ShardingCore.Extensions; +using ShardingCore.Extensions.ShardingPageExtensions; namespace Sample.SqlServer.Controllers { diff --git a/src/ShardingCore/EFCores/ShardingQueryCompiler.cs b/src/ShardingCore/EFCores/ShardingQueryCompiler.cs index e6b95f00..895e8e40 100644 --- a/src/ShardingCore/EFCores/ShardingQueryCompiler.cs +++ b/src/ShardingCore/EFCores/ShardingQueryCompiler.cs @@ -36,6 +36,7 @@ namespace ShardingCore.EFCores } + #if !EFCORE2 public TResult ExecuteAsync(Expression query, CancellationToken cancellationToken) @@ -66,11 +67,13 @@ namespace ShardingCore.EFCores public IAsyncEnumerable ExecuteAsync(Expression query) { return _shardingQueryExecutor.ExecuteAsync>(_currentContext, query); + } public Task ExecuteAsync(Expression query, CancellationToken cancellationToken) { return _shardingQueryExecutor.ExecuteAsync>(_currentContext, query, cancellationToken); + } public Func CreateCompiledQuery(Expression query) diff --git a/src/ShardingCore/Extensions/ShardingQueryableExtension.cs b/src/ShardingCore/Extensions/ShardingPageExtensions/ShardingPageExtension.cs similarity index 91% rename from src/ShardingCore/Extensions/ShardingQueryableExtension.cs rename to src/ShardingCore/Extensions/ShardingPageExtensions/ShardingPageExtension.cs index 46ca5585..28132915 100644 --- a/src/ShardingCore/Extensions/ShardingQueryableExtension.cs +++ b/src/ShardingCore/Extensions/ShardingPageExtensions/ShardingPageExtension.cs @@ -1,12 +1,10 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Core.ShardingPage.Abstractions; -namespace ShardingCore.Extensions +namespace ShardingCore.Extensions.ShardingPageExtensions { /* * @Author: xjm @@ -15,7 +13,7 @@ namespace ShardingCore.Extensions * @Ver: 1.0 * @Email: 326308290@qq.com */ - public static class ShardingQueryableExtension + public static class ShardingPageExtension { public static async Task> ToShardingPageAsync(this IQueryable source, int pageIndex, int pageSize) { @@ -49,7 +47,7 @@ namespace ShardingCore.Extensions using (shardingPageManager.CreateScope()) { //获取每次总记录数 - var count = source.Count(); + var count = source.LongCount(); if (count <= skip) return new ShardingPagedResult(new List(0), count); var data = source.Skip(skip).Take(take).ToList(); diff --git a/src/ShardingCore/ShardingPagedResult.cs b/src/ShardingCore/Extensions/ShardingPageExtensions/ShardingPagedResult.cs similarity index 94% rename from src/ShardingCore/ShardingPagedResult.cs rename to src/ShardingCore/Extensions/ShardingPageExtensions/ShardingPagedResult.cs index 484dd632..0c42839d 100644 --- a/src/ShardingCore/ShardingPagedResult.cs +++ b/src/ShardingCore/Extensions/ShardingPageExtensions/ShardingPagedResult.cs @@ -1,6 +1,6 @@ using System.Collections.Generic; -namespace ShardingCore +namespace ShardingCore.Extensions.ShardingPageExtensions { /// /// 分页集合 diff --git a/src/ShardingCore/Sharding/AbstractShardingDbContext.cs b/src/ShardingCore/Sharding/AbstractShardingDbContext.cs index 5212798d..8376feea 100644 --- a/src/ShardingCore/Sharding/AbstractShardingDbContext.cs +++ b/src/ShardingCore/Sharding/AbstractShardingDbContext.cs @@ -1,31 +1,16 @@ using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.ChangeTracking; -using Microsoft.EntityFrameworkCore.Storage; -using ShardingCore.Core; -using ShardingCore.Core.VirtualRoutes.TableRoutes; -using ShardingCore.Core.VirtualTables; -using ShardingCore.DbContexts; -using ShardingCore.DbContexts.ShardingDbContexts; -using ShardingCore.Exceptions; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.ReadWriteConfigurations; -using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Data; -using System.Data.Common; -using System.Linq; -using System.Linq.Expressions; -using System.Threading; -using System.Threading.Tasks; -using ShardingCore.Core.VirtualDatabase.VirtualDataSources; -using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources; -using ShardingCore.Core.VirtualDatabase.VirtualTables; -using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; using ShardingCore.Sharding.ShardingDbContextExecutors; using ShardingCore.Sharding.ShardingTransactions; +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; namespace ShardingCore.Sharding { diff --git a/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs new file mode 100644 index 00000000..cca4cf14 --- /dev/null +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace ShardingCore.Sharding.MergeEngines.Abstractions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/10/2 17:25:33 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public abstract class AbstractBaseMergeEngine + { + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine.cs similarity index 79% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine.cs index bb3e2bfd..432f5fbe 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine.cs @@ -1,12 +1,11 @@ using System; using System.Linq; using System.Linq.Expressions; -using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines { /* * @Author: xjm @@ -17,12 +16,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx */ public abstract class AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine : AbstractEnsureMethodCallInMemoryAsyncMergeEngine { - private readonly MethodCallExpression _methodCallExpression; private readonly TEntity _constantItem; protected AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { - _methodCallExpression = methodCallExpression; var secondExpression = GetSecondExpression(); if (!(secondExpression is ConstantExpression constantExpression)) { @@ -35,7 +32,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx { if (!(secondExpression is ConstantExpression)) { - throw new InvalidOperationException(_methodCallExpression.ShardingPrint()); + throw new InvalidOperationException(GetMethodCallExpression().ShardingPrint()); } return queryable; diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureMethodCallInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallInMemoryAsyncMergeEngine.cs similarity index 77% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureMethodCallInMemoryAsyncMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallInMemoryAsyncMergeEngine.cs index dee0a25c..f0051e67 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureMethodCallInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallInMemoryAsyncMergeEngine.cs @@ -1,13 +1,9 @@ -using System; -using System.Linq; -using System.Linq.Expressions; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; -using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.Enumerators; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines { /* * @Author: xjm diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine.cs similarity index 91% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine.cs index 4943d2d1..6f19fc8b 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine.cs @@ -1,12 +1,11 @@ using System; using System.Linq; using System.Linq.Expressions; -using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines { /* * @Author: xjm diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine.cs similarity index 74% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine.cs index 335f728d..25f5921f 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractEnsureMergeEngines/AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine.cs @@ -1,11 +1,10 @@ using System; using System.Linq; using System.Linq.Expressions; -using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines { /* * @Author: xjm @@ -16,11 +15,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx */ public abstract class AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine : AbstractEnsureMethodCallInMemoryAsyncMergeEngine { - private readonly MethodCallExpression _methodCallExpression; public AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { - _methodCallExpression = methodCallExpression; } protected override IQueryable CombineQueryable(IQueryable queryable, Expression secondExpression) @@ -30,7 +27,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx return queryable.Where(predicate); } - throw new InvalidOperationException(_methodCallExpression.ShardingPrint()); + throw new InvalidOperationException(GetMethodCallExpression().ShardingPrint()); } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericMethodCallInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractGenericMethodCallInMemoryAsyncMergeEngine.cs similarity index 78% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericMethodCallInMemoryAsyncMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractGenericMethodCallInMemoryAsyncMergeEngine.cs index bac73fa1..a6ed4f6a 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericMethodCallInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractGenericMethodCallInMemoryAsyncMergeEngine.cs @@ -1,13 +1,9 @@ -using System; -using System.Linq; -using System.Linq.Expressions; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; -using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.Enumerators; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines { /* * @Author: xjm diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine.cs similarity index 85% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine.cs index c14f4468..89782b37 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine.cs @@ -1,12 +1,11 @@ using System; using System.Linq; using System.Linq.Expressions; -using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines { /* * @Author: xjm @@ -36,9 +35,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE return queryable; } - protected override IQueryable CombineQueryable(IQueryable _queryable, Expression secondExpression) + protected override IQueryable CombineQueryable(IQueryable queryable, Expression secondExpression) { - return _queryable; + return queryable; } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine.cs similarity index 75% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine.cs index 78f2db0b..c7d1bf4a 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine.cs @@ -1,11 +1,10 @@ using System; using System.Linq; using System.Linq.Expressions; -using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines { /* * @Author: xjm @@ -16,11 +15,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE */ public abstract class AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine: AbstractGenericMethodCallInMemoryAsyncMergeEngine { - private readonly MethodCallExpression _methodCallExpression; public AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { - _methodCallExpression = methodCallExpression; } protected override IQueryable CombineQueryable(IQueryable queryable, Expression secondExpression) @@ -32,7 +29,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE return queryable.Where(predicate); } } - throw new InvalidOperationException(_methodCallExpression.ShardingPrint()); + throw new InvalidOperationException(GetMethodCallExpression().ShardingPrint()); } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs similarity index 90% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs index 2e4a7c90..5b81d8fb 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractGenericMergeEngines/AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine.cs @@ -1,16 +1,11 @@ -using System; -using System.Collections.Generic; -using System.Linq.Expressions; -using System.Text; +using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; -using ShardingCore.Core.TrackerManagers; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines { /* * @Author: xjm diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractInMemoryAsyncMergeEngine.cs similarity index 78% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractInMemoryAsyncMergeEngine.cs index f1ca7090..e11432ca 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/AbstractInMemoryAsyncMergeEngine.cs @@ -4,13 +4,12 @@ using System.Linq; using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; +using ShardingCore.Sharding.StreamMergeEngines; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge { /* * @Author: xjm @@ -46,12 +45,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions _secondExpression = expression; } } - if(_queryable==null) + if (_queryable == null) throw new ArgumentException($"argument not found IQueryable :[{methodCallExpression.ShardingPrint()}]"); - if (methodCallExpression.Arguments.Count ==2) + if (methodCallExpression.Arguments.Count == 2) { - if(_secondExpression == null) + if (_secondExpression == null) throw new InvalidOperationException(methodCallExpression.ShardingPrint()); + // ReSharper disable once VirtualMemberCallInConstructor _queryable = CombineQueryable(_queryable, _secondExpression); } @@ -66,12 +66,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions /// protected abstract IQueryable CombineQueryable(IQueryable queryable, Expression secondExpression); - private IQueryable CreateAsyncExecuteQueryable(string dsname,TableRouteResult tableRouteResult) + private IQueryable CreateAsyncExecuteQueryable(string dsname, TableRouteResult tableRouteResult) { - var shardingDbContext = _mergeContext.CreateDbContext(dsname,tableRouteResult); - var newQueryable = (IQueryable) GetStreamMergeContext().GetReWriteQueryable() + var shardingDbContext = _mergeContext.CreateDbContext(dsname, tableRouteResult); + var newQueryable = (IQueryable)GetStreamMergeContext().GetReWriteQueryable() .ReplaceDbContextQueryable(shardingDbContext); - var newCombineQueryable= DoCombineQueryable(newQueryable); + var newCombineQueryable = DoCombineQueryable(newQueryable); return newCombineQueryable ; } @@ -84,21 +84,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions { return _mergeContext.TableRouteResults.Select(routeResult => { + var asyncExecuteQueryable = CreateAsyncExecuteQueryable(dataSourceName, routeResult); + return Task.Run(async () => { - try - { - var asyncExecuteQueryable = CreateAsyncExecuteQueryable(dataSourceName, routeResult); - var queryResult = await efQuery(asyncExecuteQueryable); - return new RouteQueryResult(dataSourceName, routeResult, queryResult); - //} - } - catch (Exception e) - { - Console.WriteLine(e); - throw; - } - }); + var queryResult = await efQuery(asyncExecuteQueryable); + return new RouteQueryResult(dataSourceName, routeResult, queryResult); + + }, cancellationToken); }); }).ToArray(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IEnsureMergeResult.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/IEnsureMergeResult.cs similarity index 82% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IEnsureMergeResult.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/IEnsureMergeResult.cs index c069dc3f..f3526800 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IEnsureMergeResult.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/IEnsureMergeResult.cs @@ -1,10 +1,7 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; +using System.Threading; using System.Threading.Tasks; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge { /* * @Author: xjm diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IGenericMergeResult.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/IGenericMergeResult.cs similarity index 83% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IGenericMergeResult.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/IGenericMergeResult.cs index 1eb1a928..718edd7b 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IGenericMergeResult.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/IGenericMergeResult.cs @@ -1,10 +1,7 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; +using System.Threading; using System.Threading.Tasks; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge { /* * @Author: xjm diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/IInMemoryAsyncMergeEngine.cs similarity index 82% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IInMemoryAsyncMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/IInMemoryAsyncMergeEngine.cs index 29a3b29c..eb29fa83 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/InMemoryMerge/IInMemoryAsyncMergeEngine.cs @@ -1,11 +1,11 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; +using ShardingCore.Sharding.StreamMergeEngines; -namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions +namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge { /* * @Author: xjm diff --git a/src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/AbstractEnumeratorStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/AbstractEnumeratorStreamMergeEngine.cs new file mode 100644 index 00000000..fe51ca59 --- /dev/null +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/AbstractEnumeratorStreamMergeEngine.cs @@ -0,0 +1,149 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using ShardingCore.Exceptions; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Enumerators; +using ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x; +#if EFCORE2 +using Microsoft.EntityFrameworkCore.Extensions.Internal; +#endif + +namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/2 15:35:39 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public abstract class AbstractEnumeratorStreamMergeEngine : IEnumeratorStreamMergeEngine + { + public StreamMergeContext StreamMergeContext { get; } + + + public AbstractEnumeratorStreamMergeEngine(StreamMergeContext streamMergeContext) + { + StreamMergeContext = streamMergeContext; + } + + +#if !EFCORE2 + public IAsyncEnumerator GetAsyncEnumerator( + CancellationToken cancellationToken = new CancellationToken()) + { + return GetStreamMergeAsyncEnumerator(true, cancellationToken); + } +#endif + +#if EFCORE2 + IAsyncEnumerator IAsyncEnumerable.GetEnumerator() + { + return GetStreamMergeAsyncEnumerator(true); + } + +#endif + + public IEnumerator GetEnumerator() + { + return GetStreamMergeAsyncEnumerator(false); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + public void Dispose() + { + StreamMergeContext.Dispose(); + } + + + /// + /// 获取查询的迭代器 + /// + /// + /// + /// + private IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(bool async, + CancellationToken cancellationToken = new CancellationToken()) + { + cancellationToken.ThrowIfCancellationRequested(); + var dbStreamMergeAsyncEnumerators = GetRouteQueryStreamMergeAsyncEnumerators(async); + if (dbStreamMergeAsyncEnumerators.IsEmpty()) + throw new ShardingCoreException("GetRouteQueryStreamMergeAsyncEnumerators empty"); + return CombineStreamMergeAsyncEnumerator(dbStreamMergeAsyncEnumerators); + } + /// + /// 获取路由查询的迭代器 + /// + /// + /// + public abstract IStreamMergeAsyncEnumerator[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken()); + /// + /// 合并成一个迭代器 + /// + /// + /// + public abstract IStreamMergeAsyncEnumerator CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators); + /// + /// 开启异步线程获取并发迭代器 + /// + /// + /// + /// + /// + public Task> AsyncParallelQueryEnumerator(IQueryable queryable, bool async,CancellationToken cancellationToken=new CancellationToken()) + { + return Task.Run(async () => + { + if (async) + { + var asyncEnumerator = await GetAsyncEnumerator0(queryable); + return new StreamMergeAsyncEnumerator(asyncEnumerator); + } + else + { + var enumerator = GetEnumerator0(queryable); + return new StreamMergeAsyncEnumerator(enumerator); + } + }, cancellationToken); + } + /// + /// 获取异步迭代器 + /// + /// + /// + public async Task> GetAsyncEnumerator0(IQueryable newQueryable) + { +#if !EFCORE2 + var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator(); + await enumator.MoveNextAsync(); + return enumator; +#endif +#if EFCORE2 + var enumator = new EFCore2TryCurrentAsyncEnumerator(newQueryable.AsAsyncEnumerable().GetEnumerator()); + await enumator.MoveNext(); + return enumator; +#endif + } + /// + /// 获取同步迭代器 + /// + /// + /// + public IEnumerator GetEnumerator0(IQueryable newQueryable) + { + var enumator = newQueryable.AsEnumerable().GetEnumerator(); + enumator.MoveNext(); + return enumator; + } + + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/IEnumeratorStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/IEnumeratorStreamMergeEngine.cs similarity index 77% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/IEnumeratorStreamMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/IEnumeratorStreamMergeEngine.cs index e454e26f..2d1da613 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/IEnumeratorStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/StreamMerge/IEnumeratorStreamMergeEngine.cs @@ -1,7 +1,7 @@ using System; using System.Collections.Generic; -namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions +namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge { /* * @Author: xjm diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs similarity index 97% rename from src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs index 8f54bbbc..86d836d5 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs @@ -11,8 +11,7 @@ using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs similarity index 89% rename from src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs index 9e004ed5..d87a3aa9 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs @@ -8,8 +8,7 @@ using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs similarity index 89% rename from src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs index cfd843fb..b7fb0904 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs @@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs similarity index 97% rename from src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs index 17d0efa6..bfd1c909 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs @@ -11,8 +11,7 @@ using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/AllAsyncInMemoryMergeEngine.cs similarity index 89% rename from src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/AllAsyncInMemoryMergeEngine.cs index 35ce5b69..62185f23 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/AllAsyncInMemoryMergeEngine.cs @@ -10,8 +10,7 @@ using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AnyAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/AnyAsyncInMemoryMergeEngine.cs similarity index 89% rename from src/ShardingCore/Sharding/StreamMergeEngines/AnyAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/AnyAsyncInMemoryMergeEngine.cs index a5b9a224..5b38f207 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AnyAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/AnyAsyncInMemoryMergeEngine.cs @@ -10,8 +10,7 @@ using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/ContainsAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ContainsAsyncInMemoryMergeEngine.cs similarity index 88% rename from src/ShardingCore/Sharding/StreamMergeEngines/ContainsAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/ContainsAsyncInMemoryMergeEngine.cs index 2302f388..a170042f 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/ContainsAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ContainsAsyncInMemoryMergeEngine.cs @@ -6,8 +6,7 @@ using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/CountAsyncInMemoryMergeEngine.cs similarity index 94% rename from src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/CountAsyncInMemoryMergeEngine.cs index 15ef8d4d..e99fc6e9 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/CountAsyncInMemoryMergeEngine.cs @@ -6,7 +6,7 @@ using System.Threading; using System.Threading.Tasks; using ShardingCore.Core.ShardingPage.Abstractions; using ShardingCore.Helpers; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/AsyncEnumeratorStreamMergeEngine.cs similarity index 72% rename from src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/AsyncEnumeratorStreamMergeEngine.cs index 000a571c..48974f7a 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/AsyncEnumeratorStreamMergeEngine.cs @@ -1,13 +1,12 @@ -using ShardingCore.Sharding.ShardingQueryExecutors; using System.Collections; using System.Collections.Generic; using System.Threading; using Microsoft.EntityFrameworkCore; -using ShardingCore.Core.TrackerManagers; using ShardingCore.Sharding.Abstractions; +using ShardingCore.Sharding.ShardingQueryExecutors; using ShardingCore.Sharding.StreamMergeEngines.TrackerEnumerators; -namespace ShardingCore.Sharding.StreamMergeEngines +namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines { /* * @Author: xjm @@ -15,12 +14,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines * @Date: Saturday, 14 August 2021 22:07:28 * @Email: 326308290@qq.com */ - public class AsyncEnumerableStreamMergeEngine : IAsyncEnumerable, IEnumerable + public class AsyncEnumeratorStreamMergeEngine : IAsyncEnumerable, IEnumerable where TShardingDbContext:DbContext,IShardingDbContext { private readonly StreamMergeContext _mergeContext; - public AsyncEnumerableStreamMergeEngine(StreamMergeContext mergeContext) + public AsyncEnumeratorStreamMergeEngine(StreamMergeContext mergeContext) { _mergeContext = mergeContext; } @@ -29,7 +28,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines #if !EFCORE2 public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) { - var asyncEnumerator = new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync(cancellationToken) + cancellationToken.ThrowIfCancellationRequested(); + var asyncEnumerator = EnumeratorStreamMergeEngineFactory.Create(_mergeContext).GetMergeEngine() .GetAsyncEnumerator(cancellationToken); if (_mergeContext.IsUseShardingTrack(typeof(T))) @@ -44,7 +44,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines #if EFCORE2 IAsyncEnumerator IAsyncEnumerable.GetEnumerator() { - var asyncEnumerator = ((IAsyncEnumerable)new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync()) + var asyncEnumerator = ((IAsyncEnumerable)EnumeratorStreamMergeEngineFactory.Create(_mergeContext).GetMergeEngine()) .GetEnumerator(); if (_mergeContext.IsUseShardingTrack(typeof(T))) { @@ -57,7 +57,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines public IEnumerator GetEnumerator() { - var enumerator = ((IEnumerable)new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync()) + var enumerator = ((IEnumerable)EnumeratorStreamMergeEngineFactory.Create(_mergeContext).GetMergeEngine()) .GetEnumerator(); if (_mergeContext.IsUseShardingTrack(typeof(T))) diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Base/SequencePaginationList.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/Base/SequencePaginationList.cs similarity index 100% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Base/SequencePaginationList.cs rename to src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/Base/SequencePaginationList.cs diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs similarity index 91% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs index 6ced731e..98e34836 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/AppenOrderSequenceEnumeratorAsyncStreamMergeEngine.cs @@ -11,8 +11,8 @@ using ShardingCore.Extensions.InternalExtensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge; using ShardingCore.Sharding.PaginationConfigurations; -using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base; namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync @@ -24,7 +24,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class AppenOrderSequenceEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorAsyncStreamMergeEngine + public class AppenOrderSequenceEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorStreamMergeEngine where TShardingDbContext : DbContext, IShardingDbContext { private readonly PaginationSequenceConfig _dataSourceSequenceOrderConfig; @@ -37,8 +37,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. _routeQueryResults = routeQueryResults; } - public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async) + public override IStreamMergeAsyncEnumerator[] GetRouteQueryStreamMergeAsyncEnumerators(bool async,CancellationToken cancellationToken=new CancellationToken()) { + cancellationToken.ThrowIfCancellationRequested(); var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake(); var skip = StreamMergeContext.Skip.GetValueOrDefault(); if (skip < 0) @@ -104,7 +105,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. var enumeratorTasks = sequenceResults.Select(sequenceResult => { var newQueryable = CreateAsyncExecuteQueryable(sequenceResult.DSName, noPaginationQueryable, sequenceResult, reSetOrders); - return AsyncQueryEnumerator(newQueryable, async); + return AsyncParallelQueryEnumerator(newQueryable, async, cancellationToken); }).ToArray(); var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException(); @@ -119,7 +120,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. return newQueryable; } - public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + public override IStreamMergeAsyncEnumerator CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) { if (StreamMergeContext.HasGroupQuery()) return new MultiAggregateOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs similarity index 77% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs index 7024b115..0127ff0e 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs @@ -1,5 +1,5 @@ -using System; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; @@ -7,9 +7,9 @@ using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; -using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; +using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge; -namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync +namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync { /* * @Author: xjm @@ -18,15 +18,16 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class DefaultShardingEnumeratorAsyncStreamMergeEngine :AbstractEnumeratorAsyncStreamMergeEngine + public class DefaultShardingEnumeratorAsyncStreamMergeEngine :AbstractEnumeratorStreamMergeEngine where TShardingDbContext : DbContext, IShardingDbContext { public DefaultShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) { } - public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async) + public override IStreamMergeAsyncEnumerator[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken()) { + cancellationToken.ThrowIfCancellationRequested(); var dataSourceRouteResult = StreamMergeContext.DataSourceRouteResult; var enumeratorTasks = dataSourceRouteResult.IntersectDataSources.SelectMany(dataSourceName => { @@ -34,7 +35,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. return tableRouteResults.Select(routeResult => { var newQueryable = CreateAsyncExecuteQueryable(dataSourceName, routeResult); - return AsyncQueryEnumerator(newQueryable, async); + return AsyncParallelQueryEnumerator(newQueryable, async,cancellationToken); }); }).ToArray(); @@ -51,7 +52,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. return newQueryable; } - public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + public override IStreamMergeAsyncEnumerator CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) { if (StreamMergeContext.IsPaginationQuery()) return new PaginationStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs similarity index 87% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs index c7140f6e..9f84c3f3 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/ReverseShardingEnumeratorAsyncStreamMergeEngine.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Core.Internal.Visitors; @@ -10,7 +11,7 @@ using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; -using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; +using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge; namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync { @@ -21,7 +22,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class ReverseShardingEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorAsyncStreamMergeEngine + public class ReverseShardingEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorStreamMergeEngine { private readonly long _total; @@ -30,8 +31,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. _total = total; } - public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async) + public override IStreamMergeAsyncEnumerator[] GetRouteQueryStreamMergeAsyncEnumerators(bool async,CancellationToken cancellationToken=new CancellationToken()) { + cancellationToken.ThrowIfCancellationRequested(); var noPaginationNoOrderQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveAnyOrderBy(); var skip = StreamMergeContext.Skip.GetValueOrDefault(); var take = StreamMergeContext.Take.HasValue?StreamMergeContext.Take.Value:(_total-skip); @@ -49,7 +51,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. return StreamMergeContext.TableRouteResults.Select(routeResult => { var newQueryable = CreateAsyncExecuteQueryable(dataSourceName, reverseOrderQueryable, routeResult); - return AsyncQueryEnumerator(newQueryable, async); + return AsyncParallelQueryEnumerator(newQueryable, async,cancellationToken); }); }).ToArray();; @@ -65,7 +67,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. return newQueryable; } - public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + public override IStreamMergeAsyncEnumerator CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) { var doGetStreamMergeAsyncEnumerator = DoGetStreamMergeAsyncEnumerator(streamsAsyncEnumerators); return new InMemoryReverseStreamMergeAsyncEnumerator(doGetStreamMergeAsyncEnumerator); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs similarity index 90% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs index 8212af7b..6572068a 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SequenceEnumeratorAsyncStreamMergeEngine.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using ShardingCore.Exceptions; @@ -9,8 +10,8 @@ using ShardingCore.Extensions.InternalExtensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge; using ShardingCore.Sharding.PaginationConfigurations; -using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base; namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync @@ -22,7 +23,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class SequenceEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorAsyncStreamMergeEngine + public class SequenceEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorStreamMergeEngine where TShardingDbContext : DbContext, IShardingDbContext { private readonly PaginationSequenceConfig _dataSourceSequenceMatchOrderConfig; @@ -37,8 +38,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. _isAsc = isAsc; } - public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async) + public override IStreamMergeAsyncEnumerator[] GetRouteQueryStreamMergeAsyncEnumerators(bool async,CancellationToken cancellationToken=new CancellationToken()) { + cancellationToken.ThrowIfCancellationRequested(); var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake(); var skip = StreamMergeContext.Skip.GetValueOrDefault(); if (skip < 0) @@ -90,7 +92,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. var enumeratorTasks = sequenceResults.Select(sequenceResult => { var newQueryable = CreateAsyncExecuteQueryable(sequenceResult.DSName, noPaginationQueryable, sequenceResult); - return AsyncQueryEnumerator(newQueryable, async); + return AsyncParallelQueryEnumerator(newQueryable, async,cancellationToken); }).ToArray(); var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException(); @@ -105,7 +107,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. return newQueryable; } - public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + public override IStreamMergeAsyncEnumerator CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) { if (StreamMergeContext.HasGroupQuery()) return new MultiAggregateOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs similarity index 73% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs index ae77db09..28d327f6 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorAsync/SingleQueryEnumeratorAsyncStreamMergeEngine.cs @@ -1,10 +1,11 @@ using System.Linq; +using System.Threading; using Microsoft.EntityFrameworkCore; using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; -using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; +using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge; namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync { @@ -14,33 +15,34 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. * @Date: Thursday, 02 September 2021 20:58:10 * @Email: 326308290@qq.com */ - public class SingleQueryEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorAsyncStreamMergeEngine + public class SingleQueryEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorStreamMergeEngine where TShardingDbContext : DbContext, IShardingDbContext { public SingleQueryEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) { } - public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async) + public override IStreamMergeAsyncEnumerator[] GetRouteQueryStreamMergeAsyncEnumerators(bool async,CancellationToken cancellationToken=new CancellationToken()) { + cancellationToken.ThrowIfCancellationRequested(); var dataSourceName = StreamMergeContext.DataSourceRouteResult.IntersectDataSources.First(); var routeResult = StreamMergeContext.TableRouteResults.First(); var shardingDbContext = StreamMergeContext.CreateDbContext(dataSourceName,routeResult); var newQueryable = (IQueryable) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext); if (async) { - var asyncEnumerator = DoGetAsyncEnumerator(newQueryable).WaitAndUnwrapException(); + var asyncEnumerator = GetAsyncEnumerator0(newQueryable).WaitAndUnwrapException(); return new[] { new StreamMergeAsyncEnumerator(asyncEnumerator) }; } else { - var enumerator = DoGetEnumerator(newQueryable); + var enumerator = GetEnumerator0(newQueryable); return new[] { new StreamMergeAsyncEnumerator(enumerator) }; } } - public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + public override IStreamMergeAsyncEnumerator CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) { if (streamsAsyncEnumerators.Length != 1) throw new ShardingCoreException($"{nameof(SingleQueryEnumeratorAsyncStreamMergeEngine)} has more {nameof(IStreamMergeAsyncEnumerator)}"); diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorStreamMergeEngineFactory.cs similarity index 93% rename from src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs rename to src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorStreamMergeEngineFactory.cs index 0ab75c2f..d4718c90 100644 --- a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorStreamMergeEngineFactory.cs @@ -1,11 +1,8 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Linq.Expressions; -using System.Text; using System.Threading; using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Logging; using ShardingCore.Core.Internal.Visitors; using ShardingCore.Core.ShardingPage.Abstractions; using ShardingCore.Core.VirtualDatabase.VirtualDataSources; @@ -16,12 +13,12 @@ using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Extensions.InternalExtensions; using ShardingCore.Sharding.Abstractions; +using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge; +using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync; using ShardingCore.Sharding.PaginationConfigurations; -using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines; -using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync; -namespace ShardingCore.Sharding.ShardingQueryExecutors +namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines { /* * @Author: xjm @@ -30,24 +27,28 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class EnumeratorShardingQueryExecutor + public class EnumeratorStreamMergeEngineFactory where TShardingDbContext : DbContext, IShardingDbContext { private readonly StreamMergeContext _streamMergeContext; private readonly IShardingPageManager _shardingPageManager; private readonly IVirtualTableManager _virtualTableManager; private readonly IVirtualDataSource _virtualDataSource; - public EnumeratorShardingQueryExecutor(StreamMergeContext streamMergeContext) + private EnumeratorStreamMergeEngineFactory(StreamMergeContext streamMergeContext) { _streamMergeContext = streamMergeContext; _shardingPageManager = ShardingContainer.GetService(); _virtualTableManager = ShardingContainer.GetService>(); _virtualDataSource = ShardingContainer.GetService>(); } - - public IEnumeratorStreamMergeEngine ExecuteAsync(CancellationToken cancellationToken = new CancellationToken()) + + public static EnumeratorStreamMergeEngineFactory Create(StreamMergeContext streamMergeContext) + { + return new EnumeratorStreamMergeEngineFactory(streamMergeContext); + } + + public IEnumeratorStreamMergeEngine GetMergeEngine() { - cancellationToken.ThrowIfCancellationRequested(); //本次查询没有跨库没有跨表就可以直接执行 if (!_streamMergeContext.IsCrossDataSource&&!_streamMergeContext.IsCrossTable) { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/FirstAsyncInMemoryMergeEngine.cs similarity index 91% rename from src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/FirstAsyncInMemoryMergeEngine.cs index e57d26cd..2659be72 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/FirstAsyncInMemoryMergeEngine.cs @@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs similarity index 92% rename from src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs index b8255443..e716cc22 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs @@ -12,8 +12,7 @@ using ShardingCore.Extensions; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/LastAsyncInMemoryMergeEngine.cs similarity index 91% rename from src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/LastAsyncInMemoryMergeEngine.cs index 237d64b3..451a4a4f 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/LastAsyncInMemoryMergeEngine.cs @@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs similarity index 91% rename from src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs index 04a7a1a3..db363a83 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs @@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/LongCountAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/LongCountAsyncInMemoryMergeEngine.cs similarity index 92% rename from src/ShardingCore/Sharding/StreamMergeEngines/LongCountAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/LongCountAsyncInMemoryMergeEngine.cs index a02cbe5d..311014b8 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/LongCountAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/LongCountAsyncInMemoryMergeEngine.cs @@ -12,8 +12,7 @@ using ShardingCore.Exceptions; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/RouteQueryResult.cs b/src/ShardingCore/Sharding/MergeEngines/RouteQueryResult.cs similarity index 100% rename from src/ShardingCore/Sharding/StreamMergeEngines/RouteQueryResult.cs rename to src/ShardingCore/Sharding/MergeEngines/RouteQueryResult.cs diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/SingleAsyncInMemoryMergeEngine.cs similarity index 91% rename from src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/SingleAsyncInMemoryMergeEngine.cs index 65a037d4..6bd3bf07 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/SingleAsyncInMemoryMergeEngine.cs @@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs similarity index 91% rename from src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs rename to src/ShardingCore/Sharding/MergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs index e3d3da49..da98dcaa 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs @@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore; using ShardingCore.Extensions; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/TrackerEnumerators/AsyncTrackerEnumerator.cs b/src/ShardingCore/Sharding/MergeEngines/TrackerEnumerators/AsyncTrackerEnumerator.cs similarity index 100% rename from src/ShardingCore/Sharding/StreamMergeEngines/TrackerEnumerators/AsyncTrackerEnumerator.cs rename to src/ShardingCore/Sharding/MergeEngines/TrackerEnumerators/AsyncTrackerEnumerator.cs diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/TrackerEnumerators/TrackerEnumerator.cs b/src/ShardingCore/Sharding/MergeEngines/TrackerEnumerators/TrackerEnumerator.cs similarity index 100% rename from src/ShardingCore/Sharding/StreamMergeEngines/TrackerEnumerators/TrackerEnumerator.cs rename to src/ShardingCore/Sharding/MergeEngines/TrackerEnumerators/TrackerEnumerator.cs diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs index ded91357..5c2a4aa7 100644 --- a/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs +++ b/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs @@ -9,8 +9,9 @@ using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge; +using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines; using ShardingCore.Sharding.StreamMergeEngines; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines; #if EFCORE2 using Microsoft.EntityFrameworkCore.Internal; @@ -46,7 +47,7 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors throw new ShardingCoreException("db context operator is not IShardingDbContext"); } - public TResult ExecuteAsync(ICurrentDbContext currentContext, Expression query,CancellationToken cancellationToken = new CancellationToken()) + public TResult ExecuteAsync(ICurrentDbContext currentContext, Expression query, CancellationToken cancellationToken = new CancellationToken()) { var currentDbContext = currentContext.Context; if (currentDbContext is IShardingDbContext shardingDbContext) @@ -70,7 +71,7 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors throw new ShardingCoreException("db context operator is not IShardingDbContext"); } - private TResult DoExecute(IShardingDbContext shardingDbContext, Expression query, bool async, CancellationToken cancellationToken = new CancellationToken()) + private TResult DoExecute(IShardingDbContext shardingDbContext, Expression query, bool async, CancellationToken cancellationToken = new CancellationToken()) { if (query is MethodCallExpression methodCallExpression) @@ -132,13 +133,13 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors // private readonly IStreamMergeContextFactory _streamMergeContextFactory; - var streamMergeContextMethod = streamMergeContextFactory.GetType().GetMethod("Create"); + var streamMergeContextMethod = streamMergeContextFactory.GetType().GetMethod("Create"); if (streamMergeContextMethod == null) throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]"); var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamMergeContextFactory, new[] { queryable, shardingDbContext }); - Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<,>); + Type streamMergeEngineType = typeof(AsyncEnumeratorStreamMergeEngine<,>); streamMergeEngineType = streamMergeEngineType.MakeGenericType(shardingDbContext.GetType(), queryEntityType); return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext); } @@ -148,7 +149,7 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors { var queryEntityType = query.GetQueryEntityType(); var resultEntityType = query.GetResultType(); - streamMergeEngineType = streamMergeEngineType.MakeGenericType(shardingDbContext.GetType(),queryEntityType); + streamMergeEngineType = streamMergeEngineType.MakeGenericType(shardingDbContext.GetType(), queryEntityType); var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult); var streamEngineMethod = streamMergeEngineType.GetMethod(methodName); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs deleted file mode 100644 index 7218184d..00000000 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs +++ /dev/null @@ -1,100 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.EntityFrameworkCore; -using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; -using ShardingCore.Exceptions; -using ShardingCore.Extensions; -using ShardingCore.Sharding.Enumerators; -using ShardingCore.Sharding.Enumerators.StreamMergeAsync; -using ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x; -#if EFCORE2 -using Microsoft.EntityFrameworkCore.Extensions.Internal; -#endif - -namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions -{ - /* - * @Author: xjm - * @Description: - * @Date: 2021/9/2 15:38:05 - * @Ver: 1.0 - * @Email: 326308290@qq.com - */ - public abstract class AbstractEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorStreamMergeEngine - { - public AbstractEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) - { - } - - - public abstract IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators(bool async); - public abstract IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators); - - public Task> AsyncQueryEnumerator(IQueryable queryable, bool async) - { - return Task.Run(async () => - { - try - { - if (async) - { - var asyncEnumerator = await DoGetAsyncEnumerator(queryable); - return new StreamMergeAsyncEnumerator(asyncEnumerator); - } - else - { - var enumerator = DoGetEnumerator(queryable); - return new StreamMergeAsyncEnumerator(enumerator); - - } - } - catch (Exception e) - { - Console.WriteLine(e); - throw; - } - }); - } - public async Task> DoGetAsyncEnumerator(IQueryable newQueryable) - { -#if !EFCORE2 - var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator(); - await enumator.MoveNextAsync(); - return enumator; -#endif -#if EFCORE2 - var enumator = new EFCore2TryCurrentAsyncEnumerator(newQueryable.AsAsyncEnumerable().GetEnumerator()); - await enumator.MoveNext(); - return enumator; -#endif - } - public IEnumerator DoGetEnumerator(IQueryable newQueryable) - { - var enumator = newQueryable.AsEnumerable().GetEnumerator(); - enumator.MoveNext(); - return enumator; - } - // public virtual IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult) - // { - // var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); - // var useOriginal = StreamMergeContext > 1; - // DbContextQueryStore.TryAdd(routeResult,shardingDbContext); - // var newQueryable = (IQueryable)(useOriginal ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable()) - // .ReplaceDbContextQueryable(shardingDbContext); - // return newQueryable; - // } - - public override IStreamMergeAsyncEnumerator GetShardingAsyncEnumerator(bool async, - CancellationToken cancellationToken = new CancellationToken()) - { - var dbStreamMergeAsyncEnumerators = GetDbStreamMergeAsyncEnumerators(async); - if (dbStreamMergeAsyncEnumerators.IsEmpty()) - throw new ShardingCoreException("GetDbStreamMergeAsyncEnumerators empty"); - return GetStreamMergeAsyncEnumerator(dbStreamMergeAsyncEnumerators); - } - } -} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs deleted file mode 100644 index e38a7fcb..00000000 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs +++ /dev/null @@ -1,67 +0,0 @@ -using System; -using System.Collections; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using Microsoft.EntityFrameworkCore; -using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; -using ShardingCore.Extensions; -using ShardingCore.Sharding.Enumerators; - -namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions -{ - /* - * @Author: xjm - * @Description: - * @Date: 2021/9/2 15:35:39 - * @Ver: 1.0 - * @Email: 326308290@qq.com - */ - public abstract class AbstractEnumeratorStreamMergeEngine : IEnumeratorStreamMergeEngine - { - public StreamMergeContext StreamMergeContext { get; } - - - public AbstractEnumeratorStreamMergeEngine(StreamMergeContext streamMergeContext) - { - StreamMergeContext = streamMergeContext; - } - - public abstract IStreamMergeAsyncEnumerator GetShardingAsyncEnumerator(bool async, - CancellationToken cancellationToken = new CancellationToken()); - -#if !EFCORE2 - public IAsyncEnumerator GetAsyncEnumerator( - CancellationToken cancellationToken = new CancellationToken()) - { - return GetShardingAsyncEnumerator(true,cancellationToken); - } -#endif - -#if EFCORE2 - IAsyncEnumerator IAsyncEnumerable.GetEnumerator() - { - return GetShardingAsyncEnumerator(true); - } - -#endif - - public IEnumerator GetEnumerator() - { - return GetShardingAsyncEnumerator(false); - } - - IEnumerator IEnumerable.GetEnumerator() - { - return GetEnumerator(); - } - - public void Dispose() - { - StreamMergeContext.Dispose(); - } - - } -}