diff --git a/src/ShardingCore/Extensions/StreamMergeContextExtension.cs b/src/ShardingCore/Extensions/StreamMergeContextExtension.cs new file mode 100644 index 00000000..19044d5d --- /dev/null +++ b/src/ShardingCore/Extensions/StreamMergeContextExtension.cs @@ -0,0 +1,30 @@ +using System; +using System.Linq; +using ShardingCore.Sharding; + +namespace ShardingCore.Extensions +{ +/* +* @Author: xjm +* @Description: +* @Date: Thursday, 02 September 2021 20:46:24 +* @Email: 326308290@qq.com +*/ + public static class StreamMergeContextExtension + { + /// + /// 本次查询是否涉及到分表 + /// + /// + /// + /// + public static bool IsShardingQuery(this StreamMergeContext streamMergeContext) + { + return streamMergeContext.RouteResults.Count() > 1; + } + public static bool IsSingleShardingTableQuery(this StreamMergeContext streamMergeContext) + { + return streamMergeContext.RouteResults.First().ReplaceTables.Count(o => o.EntityType.IsShardingTable()) == 1; + } + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs b/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs index 43a6eb01..47d0e080 100644 --- a/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs +++ b/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs @@ -14,7 +14,7 @@ namespace ShardingCore.Sharding.Abstractions public interface IShardingQueryExecutor { /// - /// ִͬлȡ + /// ͬ��ִ�л�ȡ��� /// /// /// @@ -22,7 +22,7 @@ namespace ShardingCore.Sharding.Abstractions /// TResult Execute(ICurrentDbContext currentContext, Expression query); /// - /// 첽ִлȡ + /// �첽ִ�л�ȡ��� /// /// /// diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs index 973269ca..47de1bce 100644 --- a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs +++ b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs @@ -1,8 +1,14 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Linq.Expressions; using System.Text; +using System.Threading; +using ShardingCore.Core.ShardingPage.Abstractions; +using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; +using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines; +using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; namespace ShardingCore.Sharding.ShardingQueryExecutors { @@ -13,16 +19,31 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class EnumeratorShardingQueryExecutor:IShardingQueryExecutor + public class EnumeratorShardingQueryExecutor { - public MethodCallExpression GetQueryExpression() - { - throw new NotImplementedException(); - } + private readonly StreamMergeContext _streamMergeContext; + private readonly IShardingPageManager _shardingPageManager; - public IShardingDbContext GetCurrentShardingDbContext() + public EnumeratorShardingQueryExecutor(StreamMergeContext streamMergeContext) { - throw new NotImplementedException(); + _streamMergeContext = streamMergeContext; + _shardingPageManager = ShardingContainer.GetService(); } + public IEnumeratorStreamMergeEngine ExecuteAsync(CancellationToken cancellationToken = new CancellationToken()) + { + //操作单表 + if (!_streamMergeContext.IsShardingQuery()) + { + return new SingleQueryEnumeratorAsyncStreamMergeEngine(_streamMergeContext); + } + //未开启系统分表 + if (_shardingPageManager.Current == null) + { + return new DefaultShardingEnumeratorAsyncStreamMergeEngine(_streamMergeContext); + } + + + } + } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs index 44b2285f..b508e885 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs @@ -42,15 +42,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. await enumator.MoveNextAsync(); return enumator; } - public virtual IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount) - { - var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); - var useOriginal = routeCount > 1; - DbContextQueryStore.TryAdd(routeResult,shardingDbContext); - var newQueryable = (IQueryable)(useOriginal ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable()) - .ReplaceDbContextQueryable(shardingDbContext); - return newQueryable; - } + // public virtual IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult) + // { + // var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); + // var useOriginal = StreamMergeContext > 1; + // DbContextQueryStore.TryAdd(routeResult,shardingDbContext); + // var newQueryable = (IQueryable)(useOriginal ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable()) + // .ReplaceDbContextQueryable(shardingDbContext); + // return newQueryable; + // } public override IEnumerator GetEnumerator() { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/NormalEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs similarity index 72% rename from src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/NormalEnumeratorAsyncStreamMergeEngine.cs rename to src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs index fa6fa7c5..aeb38d68 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/NormalEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/DefaultShardingEnumeratorAsyncStreamMergeEngine.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -18,26 +18,22 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines * @Ver: 1.0 * @Email: 326308290@qq.com */ - public class NormalEnumeratorAsyncStreamMergeEngine:AbstractEnumeratorAsyncStreamMergeEngine + public class DefaultShardingEnumeratorAsyncStreamMergeEngine:AbstractEnumeratorAsyncStreamMergeEngine { - private readonly bool _multiRouteQuery; - public NormalEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) + public DefaultShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) { - _multiRouteQuery = streamMergeContext.RouteResults.Count() > 1; } public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() { var tableResult = StreamMergeContext.RouteResults; - var routeCount = tableResult.Count(); var enumeratorTasks = tableResult.Select(routeResult => { + var newQueryable = CreateAsyncExecuteQueryable(routeResult); return Task.Run(async () => { try { - var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount); - var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable); return new StreamMergeAsyncEnumerator(asyncEnumerator); } @@ -49,22 +45,22 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines }); }).ToArray(); - var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult(); + var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException(); return streamEnumerators; } - public override IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount) + private IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult) { var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); DbContextQueryStore.TryAdd(routeResult, shardingDbContext); - var newQueryable = (IQueryable)(_multiRouteQuery ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable()) + var newQueryable = (IQueryable)StreamMergeContext.GetReWriteQueryable() .ReplaceDbContextQueryable(shardingDbContext); return newQueryable; } public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) { - if (_multiRouteQuery && StreamMergeContext.HasSkipTake()) + if (StreamMergeContext.HasSkipTake()) return new PaginationStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); if (StreamMergeContext.HasGroupQuery()) return new MultiAggregateOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SingleQueryEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SingleQueryEnumeratorAsyncStreamMergeEngine.cs new file mode 100644 index 00000000..7a5b3aa8 --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/SingleQueryEnumeratorAsyncStreamMergeEngine.cs @@ -0,0 +1,41 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Enumerators; +using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions; + +namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines +{ +/* +* @Author: xjm +* @Description: +* @Date: Thursday, 02 September 2021 20:58:10 +* @Email: 326308290@qq.com +*/ + public class SingleQueryEnumeratorAsyncStreamMergeEngine : AbstractEnumeratorAsyncStreamMergeEngine + { + public SingleQueryEnumeratorAsyncStreamMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext) + { + } + + public override IStreamMergeAsyncEnumerator[] GetDbStreamMergeAsyncEnumerators() + { + var routeResult = StreamMergeContext.RouteResults.First(); + var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult); + DbContextQueryStore.TryAdd(routeResult, shardingDbContext); + var newQueryable = (IQueryable) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext); + + var asyncEnumerator = DoGetAsyncEnumerator(newQueryable).WaitAndUnwrapException(); + return new[] {new StreamMergeAsyncEnumerator(asyncEnumerator)}; + } + + + public override IStreamMergeAsyncEnumerator GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator[] streamsAsyncEnumerators) + { + return new MultiOrderStreamMergeAsyncEnumerator(StreamMergeContext, streamsAsyncEnumerators); + } + } +} \ No newline at end of file