diff --git a/src/ShardingCore/DIExtension.cs b/src/ShardingCore/DIExtension.cs index 013cbfcc..9e1d262e 100644 --- a/src/ShardingCore/DIExtension.cs +++ b/src/ShardingCore/DIExtension.cs @@ -18,6 +18,7 @@ using ShardingCore.Core.ShardingAccessors; using ShardingCore.Core.ShardingAccessors.Abstractions; using ShardingCore.Core.VirtualRoutes; using ShardingCore.Core.VirtualRoutes.RouteTails.Abstractions; +using ShardingCore.Sharding.ShardingQueryExecutors; namespace ShardingCore { @@ -88,6 +89,7 @@ namespace ShardingCore services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); return services; } diff --git a/src/ShardingCore/DefaultShardingDbContextCreatorConfig.cs b/src/ShardingCore/DefaultShardingDbContextCreatorConfig.cs index 9478f2fb..22095b55 100644 --- a/src/ShardingCore/DefaultShardingDbContextCreatorConfig.cs +++ b/src/ShardingCore/DefaultShardingDbContextCreatorConfig.cs @@ -1,12 +1,8 @@ using Microsoft.EntityFrameworkCore; -using ShardingCore.DbContexts.Abstractions; +using ShardingCore.DbContexts.ShardingDbContexts; using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using System; -using System.Collections.Generic; -using System.Linq; -using ShardingCore.Core.VirtualRoutes.TableRoutes; -using ShardingCore.DbContexts.ShardingDbContexts; namespace ShardingCore { diff --git a/src/ShardingCore/EFCores/ShardingQueryCompiler.cs b/src/ShardingCore/EFCores/ShardingQueryCompiler.cs index 912f1658..5decac82 100644 --- a/src/ShardingCore/EFCores/ShardingQueryCompiler.cs +++ b/src/ShardingCore/EFCores/ShardingQueryCompiler.cs @@ -13,6 +13,7 @@ using System.Linq; using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore.Internal; #if EFCORE2 using Microsoft.EntityFrameworkCore.Internal; @@ -29,178 +30,25 @@ namespace ShardingCore.EFCores public class ShardingQueryCompiler : IQueryCompiler { private readonly ICurrentDbContext _currentContext; - private readonly IStreamMergeContextFactory _streamMergeContextFactory; + private readonly IShardingQueryExecutor _shardingQueryExecutor; public ShardingQueryCompiler(ICurrentDbContext currentContext) { _currentContext = currentContext; - _streamMergeContextFactory = ShardingContainer.GetService(); + _shardingQueryExecutor = ShardingContainer.GetService(); } - private ICurrentDbContext GetCurrentDbContext() - { - return _currentContext; - } - private TResult EnumerableExecute(IShardingDbContext shardingDbContext, Expression query, bool async) - { - Type queryEntityType; - if (async) - queryEntityType = typeof(TResult).GetGenericArguments()[0]; - else - { - queryEntityType = query.Type.GetSequenceType(); - } - Type type = typeof(EnumerableQuery<>); - type = type.MakeGenericType(queryEntityType); - var queryable = Activator.CreateInstance(type, query); - - var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create"); - if (streamMergeContextMethod == null) - throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]"); - var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext }); - - - Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>); - streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); - return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext); - } public TResult Execute(Expression query) { - var async = false; - var currentDbContext = GetCurrentDbContext().Context; - - if (currentDbContext is IShardingDbContext shardingDbContext) - { - //如果根表达式为iqueryable表示需要迭代 - if (query.Type.HasImplementedRawGeneric(typeof(IQueryable<>))) - { - return EnumerableExecute(shardingDbContext, query, async); - } - - if (query is MethodCallExpression methodCallExpression) - { - switch (methodCallExpression.Method.Name) - { - - case nameof(Enumerable.First): - return GenericMergeExecute(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.FirstOrDefault): - return GenericMergeExecute(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.Last): - return GenericMergeExecute(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.LastOrDefault): - return GenericMergeExecute(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.Single): - return GenericMergeExecute(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.SingleOrDefault): - return GenericMergeExecute(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.Count): - return EnsureMergeExecute(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.LongCount): - return EnsureMergeExecute(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.Any): - return EnsureMergeExecute(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.All): - return EnsureMergeExecute(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.Max): - return GenericMergeExecute2(typeof(MaxAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.Min): - return GenericMergeExecute2(typeof(MinAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.Sum): - return EnsureMergeExecute2(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.Average): - return EnsureMergeExecute2(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default); - case nameof(Enumerable.Contains): - return EnsureMergeExecute(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); - } - } - - -#if !EFCORE2 - throw new ShardingCoreException($"db context operator not support query expression:[{query.Print()}] result type:[{typeof(TResult).FullName}]"); -#endif -#if EFCORE2 - throw new ShardingCoreException($"db context operator not support query expression:[{query}] result type:[{typeof(TResult).FullName}]"); -#endif - //IQueryable queryable = new EnumerableQuery(expression); - //var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext); - - //var streamMergeEngine = AsyncEnumerableStreamMergeEngine.Create(streamMergeContext); - //return streamMergeEngine.GetAsyncEnumerator(); - } - - throw new ShardingCoreException("db context operator is not IShardingDbContext"); + return _shardingQueryExecutor.Execute(_currentContext, query); } #if !EFCORE2 public TResult ExecuteAsync(Expression query, CancellationToken cancellationToken) { - var currentDbContext = GetCurrentDbContext().Context; - var async = true; - - if (currentDbContext is IShardingDbContext shardingDbContext) - { - if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>))) - { - - return EnumerableExecute(shardingDbContext, query, async); - - } - - if (typeof(TResult).HasImplementedRawGeneric(typeof(Task<>))) - { - if (query is MethodCallExpression methodCallExpression) - { - switch (methodCallExpression.Method.Name) - { - - case nameof(Enumerable.First): - return GenericMergeExecute(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.FirstOrDefault): - return GenericMergeExecute(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Last): - return GenericMergeExecute(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.LastOrDefault): - return GenericMergeExecute(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Single): - return GenericMergeExecute(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.SingleOrDefault): - return GenericMergeExecute(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Count): - return EnsureMergeExecute(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.LongCount): - return EnsureMergeExecute(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Any): - return EnsureMergeExecute(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.All): - return EnsureMergeExecute(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Max): - return GenericMergeExecute2(typeof(MaxAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Min): - return GenericMergeExecute2(typeof(MinAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Sum): - return EnsureMergeExecute2(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Average): - return EnsureMergeExecute2(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Contains): - return EnsureMergeExecute(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - } - } - - } - - - throw new ShardingCoreException($"db context operator not support query expression:[{query.Print()}] result type:[{typeof(TResult).FullName}]"); - //IQueryable queryable = new EnumerableQuery(expression); - //var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext); - - //var streamMergeEngine = AsyncEnumerableStreamMergeEngine.Create(streamMergeContext); - //return streamMergeEngine.GetAsyncEnumerator(); - } - - throw new ShardingCoreException("db context operator is not IShardingDbContext"); + return _shardingQueryExecutor.ExecuteAsync(_currentContext, query, cancellationToken); } public Func CreateCompiledQuery(Expression query) @@ -220,58 +68,6 @@ namespace ShardingCore.EFCores #endif - - private TResult GenericMergeExecute(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) - { - var queryEntityType = query.GetQueryEntityType(); - var resultEntityType = query.GetResultType(); - streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); - var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); - var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult)); - if (streamEngineMethod == null) - throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); - var @params = async ? new object[] { cancellationToken } : new object[0]; - return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, @params); - } - private TResult GenericMergeExecute2(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) - { - var queryEntityType = query.GetQueryEntityType(); - var resultType = query.GetResultType(); - streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType,resultType); - var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); - var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult)); - if (streamEngineMethod == null) - throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); - var @params = async ? new object[] { cancellationToken } : new object[0]; - //typeof(TResult)==?resultType - return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { resultType }).Invoke(streamEngine, @params); - } - - - private TResult EnsureMergeExecute(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) - { - streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType()); - var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); - var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult.MergeResultAsync) : nameof(IEnsureMergeResult.MergeResult)); - if (streamEngineMethod == null) - throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); - var @params = async ? new object[] { cancellationToken } : new object[0]; - return (TResult)streamEngineMethod.Invoke(streamEngine, @params); - } - - private TResult EnsureMergeExecute2(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) - { - if (async) - streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0]); - else - streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult)); - var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); - var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult.MergeResultAsync) : nameof(IEnsureMergeResult.MergeResult)); - if (streamEngineMethod == null) - throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); - var @params = async ? new object[] { cancellationToken } : new object[0]; - return (TResult)streamEngineMethod.Invoke(streamEngine, @params); - } #if EFCORE2 private IAsyncEnumerable AsyncEnumerableExecute(IShardingDbContext shardingDbContext, Expression query) @@ -369,78 +165,12 @@ namespace ShardingCore.EFCores public IAsyncEnumerable ExecuteAsync(Expression query) { - var currentDbContext = GetCurrentDbContext().Context; - - if (currentDbContext is IShardingDbContext shardingDbContext) - { - return AsyncEnumerableExecute(shardingDbContext, query); - } - - throw new ShardingCoreException("db context operator is not IShardingDbContext"); + return _shardingQueryExecutor.ExecuteAsync>(_currentContext, query, cancellationToken); } public Task ExecuteAsync(Expression query, CancellationToken cancellationToken) { - var currentDbContext = GetCurrentDbContext().Context; - var async = true; - - if (currentDbContext is IShardingDbContext shardingDbContext) - { - if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>))) - { - - return EnumerableExecuteAsync(shardingDbContext, query, async); - - } - - if (query is MethodCallExpression methodCallExpression) - { - switch (methodCallExpression.Method.Name) - { - - case nameof(Enumerable.First): - return GenericMergeExecuteAsync(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.FirstOrDefault): - return GenericMergeExecuteAsync(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Last): - return GenericMergeExecuteAsync(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.LastOrDefault): - return GenericMergeExecuteAsync(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Single): - return GenericMergeExecuteAsync(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.SingleOrDefault): - return GenericMergeExecuteAsync(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Count): - return EnsureMergeExecuteAsync(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.LongCount): - return EnsureMergeExecuteAsync(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Any): - return EnsureMergeExecuteAsync(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.All): - return EnsureMergeExecuteAsync(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Max): - return GenericMergeExecuteAsync2(typeof(MaxAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Min): - return GenericMergeExecuteAsync2(typeof(MinAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); - case nameof(Enumerable.Sum): - return EnsureMergeExecuteAsync2(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, cancellationToken); - case nameof(Enumerable.Average): - return EnsureMergeExecuteAsync2(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression,cancellationToken); - case nameof(Enumerable.Contains): - return EnsureMergeExecuteAsync(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); - } - } - - - throw new ShardingCoreException($"db context operator not support query expression:[{query}] result type:[{typeof(TResult).FullName}]"); - //IQueryable queryable = new EnumerableQuery(expression); - //var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext); - - //var streamMergeEngine = AsyncEnumerableStreamMergeEngine.Create(streamMergeContext); - //return streamMergeEngine.GetAsyncEnumerator(); - } - - throw new ShardingCoreException("db context operator is not IShardingDbContext"); + return _shardingQueryExecutor.ExecuteAsync>(_currentContext, query, cancellationToken); } public Func CreateCompiledQuery(Expression query) diff --git a/src/ShardingCore/Extensions/TaskExtension.cs b/src/ShardingCore/Extensions/TaskExtension.cs new file mode 100644 index 00000000..cb2bf408 --- /dev/null +++ b/src/ShardingCore/Extensions/TaskExtension.cs @@ -0,0 +1,258 @@ +using System; +using System.Collections.Generic; +using System.Runtime.ExceptionServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace ShardingCore.Extensions +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/1 10:22:00 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + internal static class TaskExtension + { + /// + /// 是否成功 + /// + /// + /// + public static bool IsCompletedSuccessfully(this Task task) + { + return task.IsCompleted && !(task.IsCanceled || task.IsFaulted); + } + + + + /// + /// Waits for the task to complete, unwrapping any exceptions. + /// + /// The task. May not be null. + public static void WaitAndUnwrapException(this Task task) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + task.GetAwaiter().GetResult(); + } + + /// + /// Waits for the task to complete, unwrapping any exceptions. + /// + /// The task. May not be null. + /// A cancellation token to observe while waiting for the task to complete. + /// The was cancelled before the completed, or the raised an . + public static void WaitAndUnwrapException(this Task task, CancellationToken cancellationToken) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + try + { + task.Wait(cancellationToken); + } + catch (AggregateException ex) + { + ExceptionDispatchInfo.Capture(ex).Throw(); + throw ex; + } + } + + /// + /// Waits for the task to complete, unwrapping any exceptions. + /// + /// The type of the result of the task. + /// The task. May not be null. + /// The result of the task. + public static TResult WaitAndUnwrapException(this Task task) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + return task.GetAwaiter().GetResult(); + } + + /// + /// Waits for the task to complete, unwrapping any exceptions. + /// + /// The type of the result of the task. + /// The task. May not be null. + /// A cancellation token to observe while waiting for the task to complete. + /// The result of the task. + /// The was cancelled before the completed, or the raised an . + public static TResult WaitAndUnwrapException(this Task task, CancellationToken cancellationToken) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + try + { + task.Wait(cancellationToken); + return task.Result; + } + catch (AggregateException ex) + { + ExceptionDispatchInfo.Capture(ex).Throw(); + throw ex; + } + } + + /// + /// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved. + /// + /// The task. May not be null. + public static void WaitWithoutException(this Task task) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + try + { + task.Wait(); + } + catch (AggregateException) + { + } + } + + /// + /// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved. + /// + /// The task. May not be null. + /// A cancellation token to observe while waiting for the task to complete. + /// The was cancelled before the completed. + public static void WaitWithoutException(this Task task, CancellationToken cancellationToken) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + try + { + task.Wait(cancellationToken); + } + catch (AggregateException) + { + cancellationToken.ThrowIfCancellationRequested(); + } + } + } +} +static class TaskExtension +{ + /// + /// 是否成功 + /// + /// + /// + public static bool IsCompletedSuccessfully(this Task task) + { + return task.IsCompleted && !(task.IsCanceled || task.IsFaulted); + } + + + + /// + /// Waits for the task to complete, unwrapping any exceptions. + /// + /// The task. May not be null. + public static void WaitAndUnwrapException(this Task task) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + task.GetAwaiter().GetResult(); + } + + /// + /// Waits for the task to complete, unwrapping any exceptions. + /// + /// The task. May not be null. + /// A cancellation token to observe while waiting for the task to complete. + /// The was cancelled before the completed, or the raised an . + public static void WaitAndUnwrapException(this Task task, CancellationToken cancellationToken) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + try + { + task.Wait(cancellationToken); + } + catch (AggregateException ex) + { + ExceptionDispatchInfo.Capture(ex).Throw(); + throw ex; + } + } + + /// + /// Waits for the task to complete, unwrapping any exceptions. + /// + /// The type of the result of the task. + /// The task. May not be null. + /// The result of the task. + public static TResult WaitAndUnwrapException(this Task task) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + return task.GetAwaiter().GetResult(); + } + + /// + /// Waits for the task to complete, unwrapping any exceptions. + /// + /// The type of the result of the task. + /// The task. May not be null. + /// A cancellation token to observe while waiting for the task to complete. + /// The result of the task. + /// The was cancelled before the completed, or the raised an . + public static TResult WaitAndUnwrapException(this Task task, CancellationToken cancellationToken) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + try + { + task.Wait(cancellationToken); + return task.Result; + } + catch (AggregateException ex) + { + ExceptionDispatchInfo.Capture(ex).Throw(); + throw ex; + } + } + + /// + /// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved. + /// + /// The task. May not be null. + public static void WaitWithoutException(this Task task) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + try + { + task.Wait(); + } + catch (AggregateException) + { + } + } + + /// + /// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved. + /// + /// The task. May not be null. + /// A cancellation token to observe while waiting for the task to complete. + /// The was cancelled before the completed. + public static void WaitWithoutException(this Task task, CancellationToken cancellationToken) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + try + { + task.Wait(cancellationToken); + } + catch (AggregateException) + { + cancellationToken.ThrowIfCancellationRequested(); + } + } +} +} diff --git a/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs b/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs new file mode 100644 index 00000000..43a6eb01 --- /dev/null +++ b/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs @@ -0,0 +1,34 @@ +using System.Linq.Expressions; +using System.Threading; +using Microsoft.EntityFrameworkCore.Infrastructure; +using ShardingCore.Sharding.Enumerators; + +namespace ShardingCore.Sharding.Abstractions +{ +/* +* @Author: xjm +* @Description: +* @Date: Friday, 27 August 2021 22:49:22 +* @Email: 326308290@qq.com +*/ + public interface IShardingQueryExecutor + { + /// + /// ִͬлȡ + /// + /// + /// + /// + /// + TResult Execute(ICurrentDbContext currentContext, Expression query); + /// + /// 첽ִлȡ + /// + /// + /// + /// + /// + /// + TResult ExecuteAsync(ICurrentDbContext currentContext, Expression query, CancellationToken cancellationToken = new CancellationToken()); + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Abstractions/IStreamMergeContextFactory.cs b/src/ShardingCore/Sharding/Abstractions/IStreamMergeContextFactory.cs index 7d5f3426..4c2db7bb 100644 --- a/src/ShardingCore/Sharding/Abstractions/IStreamMergeContextFactory.cs +++ b/src/ShardingCore/Sharding/Abstractions/IStreamMergeContextFactory.cs @@ -1,4 +1,5 @@ using System.Linq; +using System.Linq.Expressions; namespace ShardingCore.Sharding.Abstractions { @@ -10,6 +11,6 @@ namespace ShardingCore.Sharding.Abstractions */ public interface IStreamMergeContextFactory { - StreamMergeContext Create(IQueryable queryable,IShardingDbContext shardingDbContext); + StreamMergeContext Create(IQueryable queryable, IShardingDbContext shardingDbContext); } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/IShardingQueryExecutor.cs b/src/ShardingCore/Sharding/IShardingQueryExecutor.cs deleted file mode 100644 index 4633bf5c..00000000 --- a/src/ShardingCore/Sharding/IShardingQueryExecutor.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; -using System.Linq; -using ShardingCore.Sharding.Enumerators; - -namespace ShardingCore.Sharding -{ -/* -* @Author: xjm -* @Description: -* @Date: Friday, 27 August 2021 22:49:22 -* @Email: 326308290@qq.com -*/ - public interface IShardingQueryExecutor - { - IStreamMergeAsyncEnumerator GetStreamMergeEnumerator(); - } -} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/IPaginationConfiguration.cs b/src/ShardingCore/Sharding/PaginationConfigurations/IPaginationConfiguration.cs new file mode 100644 index 00000000..81cde904 --- /dev/null +++ b/src/ShardingCore/Sharding/PaginationConfigurations/IPaginationConfiguration.cs @@ -0,0 +1,23 @@ +using System; +using System.Collections.Generic; +using System.Text; +using ShardingCore.Core; + +namespace ShardingCore.Sharding.PaginationConfigurations +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/1 17:32:36 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public interface IPaginationConfiguration where TEntity : class,IShardingTable + { + /// + /// Configures the entity of type . + /// + /// The builder to be used to configure the entity type. + void Configure(Paginati onBuilder builder); + } +} diff --git a/src/ShardingCore/Sharding/PaginationConfigurations/PaginationBuilder.cs b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationBuilder.cs new file mode 100644 index 00000000..ed09ca1b --- /dev/null +++ b/src/ShardingCore/Sharding/PaginationConfigurations/PaginationBuilder.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using ShardingCore.Core; + +namespace ShardingCore.Sharding.PaginationConfigurations +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/1 17:33:12 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class PaginationBuilder where TEntity:class,IShardingTable + { + } +} diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/AbstractShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/AbstractShardingQueryExecutor.cs new file mode 100644 index 00000000..5718836e --- /dev/null +++ b/src/ShardingCore/Sharding/ShardingQueryExecutors/AbstractShardingQueryExecutor.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Text; +using ShardingCore.Sharding.Abstractions; + +namespace ShardingCore.Sharding.ShardingQueryExecutors +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/8/30 17:11:40 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class AbstractShardingQueryExecutor: IShardingQueryExecutor + { + private readonly MethodCallExpression _expression; + private readonly IShardingDbContext _shardingDbContext; + + public AbstractShardingQueryExecutor(MethodCallExpression expression,IShardingDbContext shardingDbContext) + { + _expression = expression; + _shardingDbContext = shardingDbContext; + } + public MethodCallExpression GetQueryExpression() + { + return _expression; + } + + public IShardingDbContext GetCurrentShardingDbContext() + { + return _shardingDbContext; + } + } +} diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs new file mode 100644 index 00000000..7332a081 --- /dev/null +++ b/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs @@ -0,0 +1,202 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore.Infrastructure; +using ShardingCore.Exceptions; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Abstractions; +using ShardingCore.Sharding.StreamMergeEngines; +using ShardingCore.Sharding.StreamMergeEngines.Abstractions; +using ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines; + +namespace ShardingCore.Sharding.ShardingQueryExecutors +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/1 7:47:05 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class DefaultShardingQueryExecutor : IShardingQueryExecutor + { + private readonly IStreamMergeContextFactory _streamMergeContextFactory; + + public DefaultShardingQueryExecutor(IStreamMergeContextFactory streamMergeContextFactory) + { + _streamMergeContextFactory = streamMergeContextFactory; + } + public TResult Execute(ICurrentDbContext currentContext, Expression query) + { + var currentDbContext = currentContext.Context; + + if (currentDbContext is IShardingDbContext shardingDbContext) + { + //如果根表达式为iqueryable表示需要迭代 + if (query.Type.HasImplementedRawGeneric(typeof(IQueryable<>))) + { + return EnumerableExecute(shardingDbContext, query, false); + } + + return DoExecute(shardingDbContext, query, false, default); + } + + throw new ShardingCoreException("db context operator is not IShardingDbContext"); + } + + public TResult ExecuteAsync(ICurrentDbContext currentContext, Expression query,CancellationToken cancellationToken = new CancellationToken()) + { + var currentDbContext = currentContext.Context; + if (currentDbContext is IShardingDbContext shardingDbContext) + { + if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>))) + { + + return EnumerableExecute(shardingDbContext, query, true); + + } + + if (typeof(TResult).HasImplementedRawGeneric(typeof(Task<>))) + { + return DoExecute(shardingDbContext, query, true, default); + + } + + + throw new ShardingCoreException($"db context operator not support query expression:[{query.ShardingPrint()}] result type:[{typeof(TResult).FullName}]"); + } + + throw new ShardingCoreException("db context operator is not IShardingDbContext"); + } + private TResult DoExecute(IShardingDbContext shardingDbContext, Expression query, bool async, CancellationToken cancellationToken = new CancellationToken()) + { + + if (query is MethodCallExpression methodCallExpression) + { + switch (methodCallExpression.Method.Name) + { + + case nameof(Enumerable.First): + return GenericMergeExecute(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.FirstOrDefault): + return GenericMergeExecute(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.Last): + return GenericMergeExecute(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.LastOrDefault): + return GenericMergeExecute(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.Single): + return GenericMergeExecute(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.SingleOrDefault): + return GenericMergeExecute(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.Count): + return EnsureMergeExecute(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.LongCount): + return EnsureMergeExecute(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.Any): + return EnsureMergeExecute(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.All): + return EnsureMergeExecute(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.Max): + return GenericMergeExecute2(typeof(MaxAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.Min): + return GenericMergeExecute2(typeof(MinAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.Sum): + return EnsureMergeExecute2(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.Average): + return EnsureMergeExecute2(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); + case nameof(Enumerable.Contains): + return EnsureMergeExecute(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); + } + } + + + throw new ShardingCoreException($"db context operator not support query expression:[{query.ShardingPrint()}] result type:[{typeof(TResult).FullName}]"); + } + private TResult EnumerableExecute(IShardingDbContext shardingDbContext, Expression query, bool async) + { + Type queryEntityType; + if (async) + queryEntityType = typeof(TResult).GetGenericArguments()[0]; + else + { + queryEntityType = query.Type.GetSequenceType(); + } + Type type = typeof(EnumerableQuery<>); + type = type.MakeGenericType(queryEntityType); + var queryable = Activator.CreateInstance(type, query); + + var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create"); + if (streamMergeContextMethod == null) + throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]"); + var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext }); + + + Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>); + streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); + return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext); + } + + + private TResult GenericMergeExecute(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) + { + var queryEntityType = query.GetQueryEntityType(); + var resultEntityType = query.GetResultType(); + streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); + var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); + var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult); + var streamEngineMethod = streamMergeEngineType.GetMethod(methodName); + if (streamEngineMethod == null) + throw new ShardingCoreException($"cant found InMemoryAsyncStreamMergeEngine method [{methodName}]"); + var @params = async ? new object[] { cancellationToken } : new object[0]; + return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, @params); + } + private TResult GenericMergeExecute2(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) + { + var queryEntityType = query.GetQueryEntityType(); + var resultType = query.GetResultType(); + streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType, resultType); + var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); + var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult); + var streamEngineMethod = streamMergeEngineType.GetMethod(methodName); + if (streamEngineMethod == null) + throw new ShardingCoreException($"cant found InMemoryAsyncStreamMergeEngine method [{methodName}]"); + var @params = async ? new object[] { cancellationToken } : new object[0]; + //typeof(TResult)==?resultType + return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { resultType }).Invoke(streamEngine, @params); + } + + + private TResult EnsureMergeExecute(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) + { + streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType()); + var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); + var methodName = async ? nameof(IEnsureMergeResult.MergeResultAsync) : nameof(IEnsureMergeResult.MergeResult); + var streamEngineMethod = streamMergeEngineType.GetMethod(methodName); + if (streamEngineMethod == null) + throw new ShardingCoreException($"cant found InMemoryAsyncStreamMergeEngine method [{methodName}]"); + var @params = async ? new object[] { cancellationToken } : new object[0]; + return (TResult)streamEngineMethod.Invoke(streamEngine, @params); + } + + private TResult EnsureMergeExecute2(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) + { + if (async) + streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0]); + else + streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult)); + var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); + var methodName = async + ? nameof(IEnsureMergeResult.MergeResultAsync) + : nameof(IEnsureMergeResult.MergeResult); + var streamEngineMethod = streamMergeEngineType.GetMethod(methodName); + if (streamEngineMethod == null) + throw new ShardingCoreException($"cant found InMemoryAsyncStreamMergeEngine method [{methodName}]"); + var @params = async ? new object[] { cancellationToken } : new object[0]; + return (TResult)streamEngineMethod.Invoke(streamEngine, @params); + } + } +} diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs new file mode 100644 index 00000000..973269ca --- /dev/null +++ b/src/ShardingCore/Sharding/ShardingQueryExecutors/EnumeratorShardingQueryExecutor.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Text; +using ShardingCore.Sharding.Abstractions; + +namespace ShardingCore.Sharding.ShardingQueryExecutors +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/8/31 21:30:28 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class EnumeratorShardingQueryExecutor:IShardingQueryExecutor + { + public MethodCallExpression GetQueryExpression() + { + throw new NotImplementedException(); + } + + public IShardingDbContext GetCurrentShardingDbContext() + { + throw new NotImplementedException(); + } + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeContext.cs b/src/ShardingCore/Sharding/StreamMergeContext.cs index bfb74209..77d6a6eb 100644 --- a/src/ShardingCore/Sharding/StreamMergeContext.cs +++ b/src/ShardingCore/Sharding/StreamMergeContext.cs @@ -35,6 +35,7 @@ namespace ShardingCore.Sharding public SelectContext SelectContext { get;} public GroupByContext GroupByContext { get; } + public IEnumerable RouteResults { get; } public StreamMergeContext(IQueryable source,IShardingDbContext shardingDbContext,IRoutingRuleEngineFactory tableRoutingRuleEngineFactory, IRouteTailFactory routeTailFactory) { @@ -50,6 +51,7 @@ namespace ShardingCore.Sharding SelectContext = reWriteResult.SelectContext; GroupByContext = reWriteResult.GroupByContext; _reWriteSource = reWriteResult.ReWriteQueryable; + RouteResults = _tableRoutingRuleEngineFactory.Route(_shardingDbContext.GetType(), _source); } //public StreamMergeContext(IQueryable source,IEnumerable routeResults, // IShardingParallelDbContextFactory shardingParallelDbContextFactory,IShardingScopeFactory shardingScopeFactory) @@ -72,10 +74,6 @@ namespace ShardingCore.Sharding var routeTail = _routeTailFactory.Create(routeResult); return _shardingDbContext.GetDbContext(false, routeTail); } - public IEnumerable GetRouteResults() - { - return _tableRoutingRuleEngineFactory.Route(_shardingDbContext.GetType(),_source); - } public IRouteTail Create(RouteResult routeResult) { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine.cs index f87a8bf2..49c9dc8d 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine.cs @@ -35,7 +35,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx } _constantItem = (TEntity)constantExpression.Value; } - protected override IQueryable ProcessSecondExpression(IQueryable queryable, Expression secondExpression) + protected override IQueryable CombineQueryable(IQueryable queryable, Expression secondExpression) { if (!(secondExpression is ConstantExpression)) { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine.cs index c7aa7ca0..bebb75db 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine.cs @@ -21,7 +21,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx { } - public override IQueryable EFQueryAfterFilter(IQueryable queryable) + public override IQueryable DoCombineQueryable(IQueryable queryable) { var secondExpression = GetSecondExpression(); if (secondExpression != null) @@ -41,7 +41,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx return queryable; } - protected override IQueryable ProcessSecondExpression(IQueryable _queryable, Expression secondExpression) + protected override IQueryable CombineQueryable(IQueryable _queryable, Expression secondExpression) { return _queryable; } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine.cs index d984d852..6e79cda4 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureExpressionMergeEngines/AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine.cs @@ -22,7 +22,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx _methodCallExpression = methodCallExpression; } - protected override IQueryable ProcessSecondExpression(IQueryable queryable, Expression secondExpression) + protected override IQueryable CombineQueryable(IQueryable queryable, Expression secondExpression) { if (secondExpression is UnaryExpression where && where.Operand is LambdaExpression lambdaExpression && lambdaExpression is Expression> predicate) { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine.cs index 703e1e80..2685e904 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine.cs @@ -20,7 +20,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE { } - public override IQueryable EFQueryAfterFilter(IQueryable queryable) + public override IQueryable DoCombineQueryable(IQueryable queryable) { var secondExpression = GetSecondExpression(); if (secondExpression != null) @@ -41,7 +41,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE return queryable; } - protected override IQueryable ProcessSecondExpression(IQueryable _queryable, Expression secondExpression) + protected override IQueryable CombineQueryable(IQueryable _queryable, Expression secondExpression) { return _queryable; } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine.cs index 4d15fdbb..5c88b204 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericExpressionMergeEngines/AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine.cs @@ -22,7 +22,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE _methodCallExpression = methodCallExpression; } - protected override IQueryable ProcessSecondExpression(IQueryable queryable, Expression secondExpression) + protected override IQueryable CombineQueryable(IQueryable queryable, Expression secondExpression) { if (secondExpression is UnaryExpression where) { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs index 6bd5017a..da687eae 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs @@ -30,28 +30,43 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) { _methodCallExpression = methodCallExpression; - var expression = methodCallExpression.Arguments.FirstOrDefault(o => typeof(IQueryable).IsAssignableFrom(o.Type)) - ?? throw new InvalidOperationException(methodCallExpression.ShardingPrint()); - _queryable = new EnumerableQuery(expression); - _secondExpression = methodCallExpression.Arguments.FirstOrDefault(o => !typeof(IQueryable).IsAssignableFrom(o.Type)); - - if (_secondExpression != null) + if (methodCallExpression.Arguments.Count < 1 || methodCallExpression.Arguments.Count > 2) + throw new ArgumentException($"argument count must 1 or 2 :[{methodCallExpression.ShardingPrint()}]"); + for (int i = 0; i < methodCallExpression.Arguments.Count; i++) { - _queryable = ProcessSecondExpression(_queryable, _secondExpression); - } - else - { - if (methodCallExpression.Arguments.Count == 2) + var expression = methodCallExpression.Arguments[i]; + if (typeof(IQueryable).IsAssignableFrom(expression.Type)) { - throw new InvalidOperationException(methodCallExpression.ShardingPrint()); + if (_queryable != null) + throw new ArgumentException( + $"argument found more 1 IQueryable :[{methodCallExpression.ShardingPrint()}]"); + _queryable = new EnumerableQuery(expression); + } + else + { + _secondExpression = expression; } } + if(_queryable==null) + throw new ArgumentException($"argument not found IQueryable :[{methodCallExpression.ShardingPrint()}]"); + if (methodCallExpression.Arguments.Count ==2) + { + if(_secondExpression == null) + throw new InvalidOperationException(methodCallExpression.ShardingPrint()); + _queryable = CombineQueryable(_queryable, _secondExpression); + } + _mergeContext = ShardingContainer.GetService().Create(_queryable, shardingDbContext); _parllelDbbContexts = new List(); } - - protected abstract IQueryable ProcessSecondExpression(IQueryable queryable, Expression secondExpression); + /// + /// 合并queryable + /// + /// + /// + /// + protected abstract IQueryable CombineQueryable(IQueryable queryable, Expression secondExpression); private IQueryable CreateAsyncExecuteQueryable(RouteResult routeResult) { @@ -59,13 +74,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions _parllelDbbContexts.Add(shardingDbContext); var newQueryable = (IQueryable) GetStreamMergeContext().GetReWriteQueryable() .ReplaceDbContextQueryable(shardingDbContext); - var newFilterQueryable = EFQueryAfterFilter(newQueryable); - return newFilterQueryable; + var newCombineQueryable= DoCombineQueryable(newQueryable); + return newCombineQueryable +; } - public async Task> ExecuteAsync(Func> efQuery, CancellationToken cancellationToken = new CancellationToken()) + public async Task>> ExecuteAsync(Func> efQuery, CancellationToken cancellationToken = new CancellationToken()) { - var tableResult = _mergeContext.GetRouteResults(); + var tableResult = _mergeContext.RouteResults; var enumeratorTasks = tableResult.Select(routeResult => { return Task.Run(async () => @@ -73,7 +89,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions try { var asyncExecuteQueryable = CreateAsyncExecuteQueryable(routeResult); - return await efQuery(asyncExecuteQueryable); + var queryResult= await efQuery(asyncExecuteQueryable); + return new RouteQueryResult(routeResult, queryResult); //} } catch (Exception e) @@ -111,7 +128,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions } - public virtual IQueryable EFQueryAfterFilter(IQueryable queryable) + public virtual IQueryable DoCombineQueryable(IQueryable queryable) { return queryable; } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IInMemoryAsyncMergeEngine.cs index a1c0721f..29a3b29c 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IInMemoryAsyncMergeEngine.cs @@ -17,7 +17,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions public interface IInMemoryAsyncMergeEngine { StreamMergeContext GetStreamMergeContext(); - Task> ExecuteAsync(Func> efQuery, + Task>> ExecuteAsync(Func> efQuery, CancellationToken cancellationToken = new CancellationToken()); } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs index 20c7c9b2..f352e024 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; @@ -29,9 +30,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines public override bool MergeResult() { - var result = base.Execute( queryable => ((IQueryable)queryable).Any()); - - return result.All(o => o); + return AsyncHelper.RunSync(()=> MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs index e938f766..6e7bc57e 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs @@ -1,10 +1,10 @@ using Microsoft.EntityFrameworkCore; using ShardingCore.Sharding.Abstractions; -using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using System.Linq; using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; +using ShardingCore.Helpers; using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines; namespace ShardingCore.Sharding.StreamMergeEngines @@ -24,16 +24,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines public override int MergeResult() { - var result = base.Execute( queryable => ((IQueryable)queryable).Count()); - - return result.Sum(); + return AsyncHelper.RunSync(() => MergeResultAsync()); } public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).CountAsync(cancellationToken), cancellationToken); - return result.Sum(); + return result.Sum(o=>o.QueryResult); } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/RouteQueryResult.cs b/src/ShardingCore/Sharding/StreamMergeEngines/RouteQueryResult.cs new file mode 100644 index 00000000..a00b347c --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/RouteQueryResult.cs @@ -0,0 +1,26 @@ +using System; +using System.Collections.Generic; +using System.Text; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; + +namespace ShardingCore.Sharding.StreamMergeEngines +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/1 13:13:17 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class RouteQueryResult + { + public RouteResult RouteResult { get; } + public TResult QueryResult { get; } + + public RouteQueryResult(RouteResult routeResult,TResult queryResult) + { + RouteResult = routeResult; + QueryResult = queryResult; + } + } +} diff --git a/src/ShardingCore/ShardingPagedResult.cs b/src/ShardingCore/ShardingPagedResult.cs index f56c8360..484dd632 100644 --- a/src/ShardingCore/ShardingPagedResult.cs +++ b/src/ShardingCore/ShardingPagedResult.cs @@ -15,7 +15,7 @@ namespace ShardingCore /// /// 总记录数。 /// 当前页面的数据。 - public ShardingPagedResult(List data, int total) + public ShardingPagedResult(List data, long total) { this.Total = total; this.Data = data; @@ -26,7 +26,7 @@ namespace ShardingCore /// /// 获取或设置总记录数。 /// - public int Total { get; set; } + public long Total { get; set; } /// /// 分页数据 ///