From 880b45e486c42d81b24b07a03a85fcbbddff0c46 Mon Sep 17 00:00:00 2001
From: xuejmnet <326308290@qq.com>
Date: Thu, 2 Sep 2021 21:19:46 +0800
Subject: [PATCH] 1
---
.../Extensions/StreamMergeContextExtension.cs | 30 ++++++++++++++
.../Abstractions/IShardingQueryExecutor.cs | 4 +-
.../EnumeratorShardingQueryExecutor.cs | 35 ++++++++++++----
...bstractEnumeratorAsyncStreamMergeEngine.cs | 18 ++++----
...ardingEnumeratorAsyncStreamMergeEngine.cs} | 20 ++++-----
...leQueryEnumeratorAsyncStreamMergeEngine.cs | 41 +++++++++++++++++++
6 files changed, 118 insertions(+), 30 deletions(-)
create mode 100644 src/ShardingCore/Extensions/StreamMergeContextExtension.cs
rename src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/{NormalEnumeratorAsyncStreamMergeEngine.cs => DefaultShardingEnumeratorAsyncStreamMergeEngine.cs} (72%)
create mode 100644 src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SingleQueryEnumeratorAsyncStreamMergeEngine.cs
diff --git a/src/ShardingCore/Extensions/StreamMergeContextExtension.cs b/src/ShardingCore/Extensions/StreamMergeContextExtension.cs
new file mode 100644
index 00000000..19044d5d
--- /dev/null
+++ b/src/ShardingCore/Extensions/StreamMergeContextExtension.cs
@@ -0,0 +1,30 @@
+using System;
+using System.Linq;
+using ShardingCore.Sharding;
+
+namespace ShardingCore.Extensions
+{
+/*
+* @Author: xjm
+* @Description:
+* @Date: Thursday, 02 September 2021 20:46:24
+* @Email: 326308290@qq.com
+*/
+ public static class StreamMergeContextExtension
+ {
+ ///
+ /// 本次查询是否涉及到分表
+ ///
+ ///
+ ///
+ ///
+ public static bool IsShardingQuery(this StreamMergeContext streamMergeContext)
+ {
+ return streamMergeContext.RouteResults.Count() > 1;
+ }
+ public static bool IsSingleShardingTableQuery(this StreamMergeContext streamMergeContext)
+ {
+ return streamMergeContext.RouteResults.First().ReplaceTables.Count(o => o.EntityType.IsShardingTable()) == 1;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs b/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs
index 43a6eb01..47d0e080 100644
--- a/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs
+++ b/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs
@@ -14,7 +14,7 @@ namespace ShardingCore.Sharding.Abstractions
public interface IShardingQueryExecutor
{
///
- /// ִͬлȡ
+ /// ͬ��ִ�л�ȡ���
///
///
///
@@ -22,7 +22,7 @@ namespace ShardingCore.Sharding.Abstractions
///
TResult Execute(ICurrentDbContext currentContext, Expression query);
///
- /// 첽ִлȡ
+ /// �첽ִ�л�ȡ���
///
///
///
diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs
index 973269ca..47de1bce 100644
--- a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs
+++ b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs
@@ -1,8 +1,14 @@
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Linq.Expressions;
using System.Text;
+using System.Threading;
+using ShardingCore.Core.ShardingPage.Abstractions;
+using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
+using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines;
+using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.ShardingQueryExecutors
{
@@ -13,16 +19,31 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
- public class EnumeratorShardingQueryExecutor:IShardingQueryExecutor
+ public class EnumeratorShardingQueryExecutor
{
- public MethodCallExpression GetQueryExpression()
- {
- throw new NotImplementedException();
- }
+ private readonly StreamMergeContext _streamMergeContext;
+ private readonly IShardingPageManager _shardingPageManager;
- public IShardingDbContext GetCurrentShardingDbContext()
+ public EnumeratorShardingQueryExecutor(StreamMergeContext streamMergeContext)
{
- throw new NotImplementedException();
+ _streamMergeContext = streamMergeContext;
+ _shardingPageManager = ShardingContainer.GetService();
}
+ public IEnumeratorStreamMergeEngine ExecuteAsync(CancellationToken cancellationToken = new CancellationToken())
+ {
+ //操作单表
+ if (!_streamMergeContext.IsShardingQuery())
+ {
+ return new SingleQueryEnumeratorAsyncStreamMergeEngine(_streamMergeContext);
+ }
+ //未开启系统分表
+ if (_shardingPageManager.Current == null)
+ {
+ return new DefaultShardingEnumeratorAsyncStreamMergeEngine(_streamMergeContext);
+ }
+
+
+ }
+
}
}
diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs
index 44b2285f..b508e885 100644
--- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs
+++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs
@@ -42,15 +42,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
await enumator.MoveNextAsync();
return enumator;
}
- public virtual IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount)
- {
- var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
- var useOriginal = routeCount > 1;
- DbContextQueryStore.TryAdd(routeResult,shardingDbContext);
- var newQueryable = (IQueryable)(useOriginal ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable())
- .ReplaceDbContextQueryable(shardingDbContext);
- return newQueryable;
- }
+ // public virtual IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult)
+ // {
+ // var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
+ // var useOriginal = StreamMergeContext > 1;
+ // DbContextQueryStore.TryAdd(routeResult,shardingDbContext);
+ // var newQueryable = (IQueryable)(useOriginal ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable())
+ // .ReplaceDbContextQueryable(shardingDbContext);
+ // return newQueryable;
+ // }
public override IEnumerator GetEnumerator()
{
diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/NormalEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs
similarity index 72%
rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/NormalEnumeratorAsyncStreamMergeEngine.cs
rename to src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs
index fa6fa7c5..aeb38d68 100644
--- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/NormalEnumeratorAsyncStreamMergeEngine.cs
+++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs
@@ -1,4 +1,4 @@
-using System;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -18,26 +18,22 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
- public class NormalEnumeratorAsyncStreamMergeEngine:AbstractEnumeratorAsyncStreamMergeEngine
+ public class DefaultShardingEnumeratorAsyncStreamMergeEngine:AbstractEnumeratorAsyncStreamMergeEngine
{
- private readonly bool _multiRouteQuery;
- public NormalEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
+ public DefaultShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
- _multiRouteQuery = streamMergeContext.RouteResults.Count() > 1;
}
public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators()
{
var tableResult = StreamMergeContext.RouteResults;
- var routeCount = tableResult.Count();
var enumeratorTasks = tableResult.Select(routeResult =>
{
+ var newQueryable = CreateAsyncExecuteQueryable(routeResult);
return Task.Run(async () =>
{
try
{
- var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount);
-
var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator(asyncEnumerator);
}
@@ -49,22 +45,22 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
});
}).ToArray();
- var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
+ var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();
return streamEnumerators;
}
- public override IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount)
+ private IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
DbContextQueryStore.TryAdd(routeResult, shardingDbContext);
- var newQueryable = (IQueryable)(_multiRouteQuery ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable())
+ var newQueryable = (IQueryable)StreamMergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
}
public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators)
{
- if (_multiRouteQuery && StreamMergeContext.HasSkipTake())
+ if (StreamMergeContext.HasSkipTake())
return new PaginationStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators);
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators);
diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SingleQueryEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SingleQueryEnumeratorAsyncStreamMergeEngine.cs
new file mode 100644
index 00000000..7a5b3aa8
--- /dev/null
+++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SingleQueryEnumeratorAsyncStreamMergeEngine.cs
@@ -0,0 +1,41 @@
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
+using ShardingCore.Extensions;
+using ShardingCore.Sharding.Enumerators;
+using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
+using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
+
+namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
+{
+/*
+* @Author: xjm
+* @Description:
+* @Date: Thursday, 02 September 2021 20:58:10
+* @Email: 326308290@qq.com
+*/
+ public class SingleQueryEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorAsyncStreamMergeEngine
+ {
+ public SingleQueryEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
+ {
+ }
+
+ public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators()
+ {
+ var routeResult = StreamMergeContext.RouteResults.First();
+ var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
+ DbContextQueryStore.TryAdd(routeResult, shardingDbContext);
+ var newQueryable = (IQueryable) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext);
+
+ var asyncEnumerator = DoGetAsyncEnumerator(newQueryable).WaitAndUnwrapException();
+ return new[] {new StreamMergeAsyncEnumerator(asyncEnumerator)};
+ }
+
+
+ public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators)
+ {
+ return new MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators);
+ }
+ }
+}
\ No newline at end of file