改了一部分代码

This commit is contained in:
xuejiaming 2021-09-01 17:34:57 +08:00
parent 4dbb113455
commit af71145aee
24 changed files with 691 additions and 342 deletions

View File

@ -18,6 +18,7 @@ using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Core.ShardingAccessors.Abstractions;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding.ShardingQueryExecutors;
namespace ShardingCore
{
@ -88,6 +89,7 @@ namespace ShardingCore
services.AddSingleton<IRouteTailFactory, RouteTailFactory>();
services.AddSingleton<IShardingRouteManager, ShardingRouteManager>();
services.AddSingleton<IShardingRouteAccessor, ShardingRouteAccessor>();
services.AddSingleton<IShardingQueryExecutor, DefaultShardingQueryExecutor>();
return services;
}

View File

@ -1,12 +1,8 @@
using Microsoft.EntityFrameworkCore;
using ShardingCore.DbContexts.Abstractions;
using ShardingCore.DbContexts.ShardingDbContexts;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using System;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.DbContexts.ShardingDbContexts;
namespace ShardingCore
{

View File

@ -13,6 +13,7 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Internal;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Internal;
@ -29,178 +30,25 @@ namespace ShardingCore.EFCores
public class ShardingQueryCompiler : IQueryCompiler
{
private readonly ICurrentDbContext _currentContext;
private readonly IStreamMergeContextFactory _streamMergeContextFactory;
private readonly IShardingQueryExecutor _shardingQueryExecutor;
public ShardingQueryCompiler(ICurrentDbContext currentContext)
{
_currentContext = currentContext;
_streamMergeContextFactory = ShardingContainer.GetService<IStreamMergeContextFactory>();
_shardingQueryExecutor = ShardingContainer.GetService<IShardingQueryExecutor>();
}
private ICurrentDbContext GetCurrentDbContext()
{
return _currentContext;
}
private TResult EnumerableExecute<TResult>(IShardingDbContext shardingDbContext, Expression query, bool async)
{
Type queryEntityType;
if (async)
queryEntityType = typeof(TResult).GetGenericArguments()[0];
else
{
queryEntityType = query.Type.GetSequenceType();
}
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, query);
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext });
Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
}
public TResult Execute<TResult>(Expression query)
{
var async = false;
var currentDbContext = GetCurrentDbContext().Context;
if (currentDbContext is IShardingDbContext shardingDbContext)
{
//如果根表达式为iqueryable表示需要迭代
if (query.Type.HasImplementedRawGeneric(typeof(IQueryable<>)))
{
return EnumerableExecute<TResult>(shardingDbContext, query, async);
}
if (query is MethodCallExpression methodCallExpression)
{
switch (methodCallExpression.Method.Name)
{
case nameof(Enumerable.First):
return GenericMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.FirstOrDefault):
return GenericMergeExecute<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Last):
return GenericMergeExecute<TResult>(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.LastOrDefault):
return GenericMergeExecute<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Single):
return GenericMergeExecute<TResult>(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.SingleOrDefault):
return GenericMergeExecute<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Count):
return EnsureMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.LongCount):
return EnsureMergeExecute<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Any):
return EnsureMergeExecute<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.All):
return EnsureMergeExecute<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Max):
return GenericMergeExecute2<TResult>(typeof(MaxAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Min):
return GenericMergeExecute2<TResult>(typeof(MinAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Sum):
return EnsureMergeExecute2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Average):
return EnsureMergeExecute2<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default);
case nameof(Enumerable.Contains):
return EnsureMergeExecute<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default);
}
}
#if !EFCORE2
throw new ShardingCoreException($"db context operator not support query expression:[{query.Print()}] result type:[{typeof(TResult).FullName}]");
#endif
#if EFCORE2
throw new ShardingCoreException($"db context operator not support query expression:[{query}] result type:[{typeof(TResult).FullName}]");
#endif
//IQueryable<TResult> queryable = new EnumerableQuery<TResult>(expression);
//var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext);
//var streamMergeEngine = AsyncEnumerableStreamMergeEngine<TResult>.Create<TResult>(streamMergeContext);
//return streamMergeEngine.GetAsyncEnumerator();
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
return _shardingQueryExecutor.Execute<TResult>(_currentContext, query);
}
#if !EFCORE2
public TResult ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
var currentDbContext = GetCurrentDbContext().Context;
var async = true;
if (currentDbContext is IShardingDbContext shardingDbContext)
{
if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>)))
{
return EnumerableExecute<TResult>(shardingDbContext, query, async);
}
if (typeof(TResult).HasImplementedRawGeneric(typeof(Task<>)))
{
if (query is MethodCallExpression methodCallExpression)
{
switch (methodCallExpression.Method.Name)
{
case nameof(Enumerable.First):
return GenericMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.FirstOrDefault):
return GenericMergeExecute<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Last):
return GenericMergeExecute<TResult>(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LastOrDefault):
return GenericMergeExecute<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Single):
return GenericMergeExecute<TResult>(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.SingleOrDefault):
return GenericMergeExecute<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Count):
return EnsureMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LongCount):
return EnsureMergeExecute<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Any):
return EnsureMergeExecute<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.All):
return EnsureMergeExecute<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Max):
return GenericMergeExecute2<TResult>(typeof(MaxAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Min):
return GenericMergeExecute2<TResult>(typeof(MinAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Sum):
return EnsureMergeExecute2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Average):
return EnsureMergeExecute2<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Contains):
return EnsureMergeExecute<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
}
}
}
throw new ShardingCoreException($"db context operator not support query expression:[{query.Print()}] result type:[{typeof(TResult).FullName}]");
//IQueryable<TResult> queryable = new EnumerableQuery<TResult>(expression);
//var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext);
//var streamMergeEngine = AsyncEnumerableStreamMergeEngine<TResult>.Create<TResult>(streamMergeContext);
//return streamMergeEngine.GetAsyncEnumerator();
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
return _shardingQueryExecutor.ExecuteAsync<TResult>(_currentContext, query, cancellationToken);
}
public Func<QueryContext, TResult> CreateCompiledQuery<TResult>(Expression query)
@ -220,58 +68,6 @@ namespace ShardingCore.EFCores
#endif
private TResult GenericMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
var queryEntityType = query.GetQueryEntityType();
var resultEntityType = query.GetResultType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, @params);
}
private TResult GenericMergeExecute2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
var queryEntityType = query.GetQueryEntityType();
var resultType = query.GetResultType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType,resultType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
var @params = async ? new object[] { cancellationToken } : new object[0];
//typeof(TResult)==?resultType
return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { resultType }).Invoke(streamEngine, @params);
}
private TResult EnsureMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType());
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
}
private TResult EnsureMergeExecute2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
if (async)
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0]);
else
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult));
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
}
#if EFCORE2
private IAsyncEnumerable<TResult> AsyncEnumerableExecute<TResult>(IShardingDbContext shardingDbContext, Expression query)
@ -369,78 +165,12 @@ namespace ShardingCore.EFCores
public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(Expression query)
{
var currentDbContext = GetCurrentDbContext().Context;
if (currentDbContext is IShardingDbContext shardingDbContext)
{
return AsyncEnumerableExecute<TResult>(shardingDbContext, query);
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
return _shardingQueryExecutor.ExecuteAsync<IAsyncEnumerable<TResult>>(_currentContext, query, cancellationToken);
}
public Task<TResult> ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
var currentDbContext = GetCurrentDbContext().Context;
var async = true;
if (currentDbContext is IShardingDbContext shardingDbContext)
{
if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>)))
{
return EnumerableExecuteAsync<TResult>(shardingDbContext, query, async);
}
if (query is MethodCallExpression methodCallExpression)
{
switch (methodCallExpression.Method.Name)
{
case nameof(Enumerable.First):
return GenericMergeExecuteAsync<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.FirstOrDefault):
return GenericMergeExecuteAsync<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Last):
return GenericMergeExecuteAsync<TResult>(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LastOrDefault):
return GenericMergeExecuteAsync<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Single):
return GenericMergeExecuteAsync<TResult>(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.SingleOrDefault):
return GenericMergeExecuteAsync<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Count):
return EnsureMergeExecuteAsync<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LongCount):
return EnsureMergeExecuteAsync<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Any):
return EnsureMergeExecuteAsync<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.All):
return EnsureMergeExecuteAsync<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Max):
return GenericMergeExecuteAsync2<TResult>(typeof(MaxAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Min):
return GenericMergeExecuteAsync2<TResult>(typeof(MinAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Sum):
return EnsureMergeExecuteAsync2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, cancellationToken);
case nameof(Enumerable.Average):
return EnsureMergeExecuteAsync2<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression,cancellationToken);
case nameof(Enumerable.Contains):
return EnsureMergeExecuteAsync<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
}
}
throw new ShardingCoreException($"db context operator not support query expression:[{query}] result type:[{typeof(TResult).FullName}]");
//IQueryable<TResult> queryable = new EnumerableQuery<TResult>(expression);
//var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext);
//var streamMergeEngine = AsyncEnumerableStreamMergeEngine<TResult>.Create<TResult>(streamMergeContext);
//return streamMergeEngine.GetAsyncEnumerator();
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
return _shardingQueryExecutor.ExecuteAsync<Task<TResult>>(_currentContext, query, cancellationToken);
}
public Func<QueryContext, TResult> CreateCompiledQuery<TResult>(Expression query)

View File

@ -0,0 +1,258 @@
using System;
using System.Collections.Generic;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Extensions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/1 10:22:00
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal static class TaskExtension
{
/// <summary>
/// 是否成功
/// </summary>
/// <param name="task"></param>
/// <returns></returns>
public static bool IsCompletedSuccessfully(this Task task)
{
return task.IsCompleted && !(task.IsCanceled || task.IsFaulted);
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
public static void WaitAndUnwrapException(this Task task)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
task.GetAwaiter().GetResult();
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <param name="cancellationToken">A cancellation token to observe while waiting for the task to complete.</param>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was cancelled before the <paramref name="task"/> completed, or the <paramref name="task"/> raised an <see cref="OperationCanceledException"/>.</exception>
public static void WaitAndUnwrapException(this Task task, CancellationToken cancellationToken)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait(cancellationToken);
}
catch (AggregateException ex)
{
ExceptionDispatchInfo.Capture(ex).Throw();
throw ex;
}
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <typeparam name="TResult">The type of the result of the task.</typeparam>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <returns>The result of the task.</returns>
public static TResult WaitAndUnwrapException<TResult>(this Task<TResult> task)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
return task.GetAwaiter().GetResult();
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <typeparam name="TResult">The type of the result of the task.</typeparam>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <param name="cancellationToken">A cancellation token to observe while waiting for the task to complete.</param>
/// <returns>The result of the task.</returns>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was cancelled before the <paramref name="task"/> completed, or the <paramref name="task"/> raised an <see cref="OperationCanceledException"/>.</exception>
public static TResult WaitAndUnwrapException<TResult>(this Task<TResult> task, CancellationToken cancellationToken)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait(cancellationToken);
return task.Result;
}
catch (AggregateException ex)
{
ExceptionDispatchInfo.Capture(ex).Throw();
throw ex;
}
}
/// <summary>
/// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
public static void WaitWithoutException(this Task task)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait();
}
catch (AggregateException)
{
}
}
/// <summary>
/// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <param name="cancellationToken">A cancellation token to observe while waiting for the task to complete.</param>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was cancelled before the <paramref name="task"/> completed.</exception>
public static void WaitWithoutException(this Task task, CancellationToken cancellationToken)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait(cancellationToken);
}
catch (AggregateException)
{
cancellationToken.ThrowIfCancellationRequested();
}
}
}
}
static class TaskExtension
{
/// <summary>
/// 是否成功
/// </summary>
/// <param name="task"></param>
/// <returns></returns>
public static bool IsCompletedSuccessfully(this Task task)
{
return task.IsCompleted && !(task.IsCanceled || task.IsFaulted);
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
public static void WaitAndUnwrapException(this Task task)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
task.GetAwaiter().GetResult();
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <param name="cancellationToken">A cancellation token to observe while waiting for the task to complete.</param>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was cancelled before the <paramref name="task"/> completed, or the <paramref name="task"/> raised an <see cref="OperationCanceledException"/>.</exception>
public static void WaitAndUnwrapException(this Task task, CancellationToken cancellationToken)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait(cancellationToken);
}
catch (AggregateException ex)
{
ExceptionDispatchInfo.Capture(ex).Throw();
throw ex;
}
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <typeparam name="TResult">The type of the result of the task.</typeparam>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <returns>The result of the task.</returns>
public static TResult WaitAndUnwrapException<TResult>(this Task<TResult> task)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
return task.GetAwaiter().GetResult();
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <typeparam name="TResult">The type of the result of the task.</typeparam>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <param name="cancellationToken">A cancellation token to observe while waiting for the task to complete.</param>
/// <returns>The result of the task.</returns>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was cancelled before the <paramref name="task"/> completed, or the <paramref name="task"/> raised an <see cref="OperationCanceledException"/>.</exception>
public static TResult WaitAndUnwrapException<TResult>(this Task<TResult> task, CancellationToken cancellationToken)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait(cancellationToken);
return task.Result;
}
catch (AggregateException ex)
{
ExceptionDispatchInfo.Capture(ex).Throw();
throw ex;
}
}
/// <summary>
/// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
public static void WaitWithoutException(this Task task)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait();
}
catch (AggregateException)
{
}
}
/// <summary>
/// Waits for the task to complete, but does not raise task exceptions. The task exception (if any) is unobserved.
/// </summary>
/// <param name="task">The task. May not be <c>null</c>.</param>
/// <param name="cancellationToken">A cancellation token to observe while waiting for the task to complete.</param>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was cancelled before the <paramref name="task"/> completed.</exception>
public static void WaitWithoutException(this Task task, CancellationToken cancellationToken)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
task.Wait(cancellationToken);
}
catch (AggregateException)
{
cancellationToken.ThrowIfCancellationRequested();
}
}
}
}

View File

@ -0,0 +1,34 @@
using System.Linq.Expressions;
using System.Threading;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: Friday, 27 August 2021 22:49:22
* @Email: 326308290@qq.com
*/
public interface IShardingQueryExecutor
{
/// <summary>
/// 同步执行获取结果
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="currentContext"></param>
/// <param name="query"></param>
/// <returns></returns>
TResult Execute<TResult>(ICurrentDbContext currentContext, Expression query);
/// <summary>
/// 异步执行获取结果
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="currentContext"></param>
/// <param name="query"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
TResult ExecuteAsync<TResult>(ICurrentDbContext currentContext, Expression query, CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -1,4 +1,5 @@
using System.Linq;
using System.Linq.Expressions;
namespace ShardingCore.Sharding.Abstractions
{
@ -10,6 +11,6 @@ namespace ShardingCore.Sharding.Abstractions
*/
public interface IStreamMergeContextFactory
{
StreamMergeContext<T> Create<T>(IQueryable<T> queryable,IShardingDbContext shardingDbContext);
StreamMergeContext<T> Create<T>(IQueryable<T> queryable, IShardingDbContext shardingDbContext);
}
}

View File

@ -1,17 +0,0 @@
using System;
using System.Linq;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding
{
/*
* @Author: xjm
* @Description:
* @Date: Friday, 27 August 2021 22:49:22
* @Email: 326308290@qq.com
*/
public interface IShardingQueryExecutor<T>
{
IStreamMergeAsyncEnumerator<T> GetStreamMergeEnumerator();
}
}

View File

@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Text;
using ShardingCore.Core;
namespace ShardingCore.Sharding.PaginationConfigurations
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/1 17:32:36
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IPaginationConfiguration<TEntity> where TEntity : class,IShardingTable
{
/// <summary>
/// Configures the entity of type <typeparamref name="TEntity" />.
/// </summary>
/// <param name="builder"> The builder to be used to configure the entity type. </param>
void Configure(Paginati onBuilder<TEntity> builder);
}
}

View File

@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;
using ShardingCore.Core;
namespace ShardingCore.Sharding.PaginationConfigurations
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/1 17:33:12
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class PaginationBuilder<TEntity> where TEntity:class,IShardingTable
{
}
}

View File

@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ShardingQueryExecutors
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/30 17:11:40
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class AbstractShardingQueryExecutor<TEntity>: IShardingQueryExecutor<TEntity>
{
private readonly MethodCallExpression _expression;
private readonly IShardingDbContext _shardingDbContext;
public AbstractShardingQueryExecutor(MethodCallExpression expression,IShardingDbContext shardingDbContext)
{
_expression = expression;
_shardingDbContext = shardingDbContext;
}
public MethodCallExpression GetQueryExpression()
{
return _expression;
}
public IShardingDbContext GetCurrentShardingDbContext()
{
return _shardingDbContext;
}
}
}

View File

@ -0,0 +1,202 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines;
namespace ShardingCore.Sharding.ShardingQueryExecutors
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/1 7:47:05
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class DefaultShardingQueryExecutor : IShardingQueryExecutor
{
private readonly IStreamMergeContextFactory _streamMergeContextFactory;
public DefaultShardingQueryExecutor(IStreamMergeContextFactory streamMergeContextFactory)
{
_streamMergeContextFactory = streamMergeContextFactory;
}
public TResult Execute<TResult>(ICurrentDbContext currentContext, Expression query)
{
var currentDbContext = currentContext.Context;
if (currentDbContext is IShardingDbContext shardingDbContext)
{
//如果根表达式为iqueryable表示需要迭代
if (query.Type.HasImplementedRawGeneric(typeof(IQueryable<>)))
{
return EnumerableExecute<TResult>(shardingDbContext, query, false);
}
return DoExecute<TResult>(shardingDbContext, query, false, default);
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
public TResult ExecuteAsync<TResult>(ICurrentDbContext currentContext, Expression query,CancellationToken cancellationToken = new CancellationToken())
{
var currentDbContext = currentContext.Context;
if (currentDbContext is IShardingDbContext shardingDbContext)
{
if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>)))
{
return EnumerableExecute<TResult>(shardingDbContext, query, true);
}
if (typeof(TResult).HasImplementedRawGeneric(typeof(Task<>)))
{
return DoExecute<TResult>(shardingDbContext, query, true, default);
}
throw new ShardingCoreException($"db context operator not support query expression:[{query.ShardingPrint()}] result type:[{typeof(TResult).FullName}]");
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
private TResult DoExecute<TResult>(IShardingDbContext shardingDbContext, Expression query, bool async, CancellationToken cancellationToken = new CancellationToken())
{
if (query is MethodCallExpression methodCallExpression)
{
switch (methodCallExpression.Method.Name)
{
case nameof(Enumerable.First):
return GenericMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.FirstOrDefault):
return GenericMergeExecute<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Last):
return GenericMergeExecute<TResult>(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LastOrDefault):
return GenericMergeExecute<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Single):
return GenericMergeExecute<TResult>(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.SingleOrDefault):
return GenericMergeExecute<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Count):
return EnsureMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LongCount):
return EnsureMergeExecute<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Any):
return EnsureMergeExecute<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.All):
return EnsureMergeExecute<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Max):
return GenericMergeExecute2<TResult>(typeof(MaxAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Min):
return GenericMergeExecute2<TResult>(typeof(MinAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Sum):
return EnsureMergeExecute2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Average):
return EnsureMergeExecute2<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Contains):
return EnsureMergeExecute<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
}
}
throw new ShardingCoreException($"db context operator not support query expression:[{query.ShardingPrint()}] result type:[{typeof(TResult).FullName}]");
}
private TResult EnumerableExecute<TResult>(IShardingDbContext shardingDbContext, Expression query, bool async)
{
Type queryEntityType;
if (async)
queryEntityType = typeof(TResult).GetGenericArguments()[0];
else
{
queryEntityType = query.Type.GetSequenceType();
}
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, query);
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext });
Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
}
private TResult GenericMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
var queryEntityType = query.GetQueryEntityType();
var resultEntityType = query.GetResultType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult);
var streamEngineMethod = streamMergeEngineType.GetMethod(methodName);
if (streamEngineMethod == null)
throw new ShardingCoreException($"cant found InMemoryAsyncStreamMergeEngine method [{methodName}]");
var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, @params);
}
private TResult GenericMergeExecute2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
var queryEntityType = query.GetQueryEntityType();
var resultType = query.GetResultType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType, resultType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult);
var streamEngineMethod = streamMergeEngineType.GetMethod(methodName);
if (streamEngineMethod == null)
throw new ShardingCoreException($"cant found InMemoryAsyncStreamMergeEngine method [{methodName}]");
var @params = async ? new object[] { cancellationToken } : new object[0];
//typeof(TResult)==?resultType
return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { resultType }).Invoke(streamEngine, @params);
}
private TResult EnsureMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType());
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var methodName = async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult);
var streamEngineMethod = streamMergeEngineType.GetMethod(methodName);
if (streamEngineMethod == null)
throw new ShardingCoreException($"cant found InMemoryAsyncStreamMergeEngine method [{methodName}]");
var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
}
private TResult EnsureMergeExecute2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
if (async)
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0]);
else
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult));
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var methodName = async
? nameof(IEnsureMergeResult<object>.MergeResultAsync)
: nameof(IEnsureMergeResult<object>.MergeResult);
var streamEngineMethod = streamMergeEngineType.GetMethod(methodName);
if (streamEngineMethod == null)
throw new ShardingCoreException($"cant found InMemoryAsyncStreamMergeEngine method [{methodName}]");
var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
}
}
}

View File

@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ShardingQueryExecutors
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/31 21:30:28
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class EnumeratorShardingQueryExecutor<TEntity>:IShardingQueryExecutor<TEntity>
{
public MethodCallExpression GetQueryExpression()
{
throw new NotImplementedException();
}
public IShardingDbContext GetCurrentShardingDbContext()
{
throw new NotImplementedException();
}
}
}

View File

@ -35,6 +35,7 @@ namespace ShardingCore.Sharding
public SelectContext SelectContext { get;}
public GroupByContext GroupByContext { get; }
public IEnumerable<RouteResult> RouteResults { get; }
public StreamMergeContext(IQueryable<T> source,IShardingDbContext shardingDbContext,IRoutingRuleEngineFactory tableRoutingRuleEngineFactory, IRouteTailFactory routeTailFactory)
{
@ -50,6 +51,7 @@ namespace ShardingCore.Sharding
SelectContext = reWriteResult.SelectContext;
GroupByContext = reWriteResult.GroupByContext;
_reWriteSource = reWriteResult.ReWriteQueryable;
RouteResults = _tableRoutingRuleEngineFactory.Route(_shardingDbContext.GetType(), _source);
}
//public StreamMergeContext(IQueryable<T> source,IEnumerable<RouteResult> routeResults,
// IShardingParallelDbContextFactory shardingParallelDbContextFactory,IShardingScopeFactory shardingScopeFactory)
@ -72,10 +74,6 @@ namespace ShardingCore.Sharding
var routeTail = _routeTailFactory.Create(routeResult);
return _shardingDbContext.GetDbContext(false, routeTail);
}
public IEnumerable<RouteResult> GetRouteResults()
{
return _tableRoutingRuleEngineFactory.Route(_shardingDbContext.GetType(),_source);
}
public IRouteTail Create(RouteResult routeResult)
{

View File

@ -35,7 +35,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
}
_constantItem = (TEntity)constantExpression.Value;
}
protected override IQueryable<TEntity> ProcessSecondExpression(IQueryable<TEntity> queryable, Expression secondExpression)
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression)
{
if (!(secondExpression is ConstantExpression))
{

View File

@ -21,7 +21,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
{
}
public override IQueryable EFQueryAfterFilter<TResult1>(IQueryable<TEntity> queryable)
public override IQueryable DoCombineQueryable<TResult1>(IQueryable<TEntity> queryable)
{
var secondExpression = GetSecondExpression();
if (secondExpression != null)
@ -41,7 +41,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
return queryable;
}
protected override IQueryable<TEntity> ProcessSecondExpression(IQueryable<TEntity> _queryable, Expression secondExpression)
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> _queryable, Expression secondExpression)
{
return _queryable;
}

View File

@ -22,7 +22,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
_methodCallExpression = methodCallExpression;
}
protected override IQueryable<TEntity> ProcessSecondExpression(IQueryable<TEntity> queryable, Expression secondExpression)
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression)
{
if (secondExpression is UnaryExpression where && where.Operand is LambdaExpression lambdaExpression && lambdaExpression is Expression<Func<TEntity, bool>> predicate)
{

View File

@ -20,7 +20,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE
{
}
public override IQueryable EFQueryAfterFilter<TResult>(IQueryable<TEntity> queryable)
public override IQueryable DoCombineQueryable<TResult>(IQueryable<TEntity> queryable)
{
var secondExpression = GetSecondExpression();
if (secondExpression != null)
@ -41,7 +41,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE
return queryable;
}
protected override IQueryable<TEntity> ProcessSecondExpression(IQueryable<TEntity> _queryable, Expression secondExpression)
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> _queryable, Expression secondExpression)
{
return _queryable;
}

View File

@ -22,7 +22,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE
_methodCallExpression = methodCallExpression;
}
protected override IQueryable<TEntity> ProcessSecondExpression(IQueryable<TEntity> queryable, Expression secondExpression)
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression)
{
if (secondExpression is UnaryExpression where)
{

View File

@ -30,28 +30,43 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext)
{
_methodCallExpression = methodCallExpression;
var expression = methodCallExpression.Arguments.FirstOrDefault(o => typeof(IQueryable).IsAssignableFrom(o.Type))
?? throw new InvalidOperationException(methodCallExpression.ShardingPrint());
_queryable = new EnumerableQuery<TEntity>(expression);
_secondExpression = methodCallExpression.Arguments.FirstOrDefault(o => !typeof(IQueryable).IsAssignableFrom(o.Type));
if (_secondExpression != null)
if (methodCallExpression.Arguments.Count < 1 || methodCallExpression.Arguments.Count > 2)
throw new ArgumentException($"argument count must 1 or 2 :[{methodCallExpression.ShardingPrint()}]");
for (int i = 0; i < methodCallExpression.Arguments.Count; i++)
{
_queryable = ProcessSecondExpression(_queryable, _secondExpression);
var expression = methodCallExpression.Arguments[i];
if (typeof(IQueryable).IsAssignableFrom(expression.Type))
{
if (_queryable != null)
throw new ArgumentException(
$"argument found more 1 IQueryable :[{methodCallExpression.ShardingPrint()}]");
_queryable = new EnumerableQuery<TEntity>(expression);
}
else
{
if (methodCallExpression.Arguments.Count == 2)
_secondExpression = expression;
}
}
if(_queryable==null)
throw new ArgumentException($"argument not found IQueryable :[{methodCallExpression.ShardingPrint()}]");
if (methodCallExpression.Arguments.Count ==2)
{
if(_secondExpression == null)
throw new InvalidOperationException(methodCallExpression.ShardingPrint());
_queryable = CombineQueryable(_queryable, _secondExpression);
}
}
_mergeContext = ShardingContainer.GetService<IStreamMergeContextFactory>().Create(_queryable, shardingDbContext);
_parllelDbbContexts = new List<DbContext>();
}
protected abstract IQueryable<TEntity> ProcessSecondExpression(IQueryable<TEntity> queryable, Expression secondExpression);
/// <summary>
/// 合并queryable
/// </summary>
/// <param name="queryable"></param>
/// <param name="secondExpression"></param>
/// <returns></returns>
protected abstract IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression);
private IQueryable CreateAsyncExecuteQueryable<TResult>(RouteResult routeResult)
{
@ -59,13 +74,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
_parllelDbbContexts.Add(shardingDbContext);
var newQueryable = (IQueryable<TEntity>) GetStreamMergeContext().GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var newFilterQueryable = EFQueryAfterFilter<TResult>(newQueryable);
return newFilterQueryable;
var newCombineQueryable= DoCombineQueryable<TResult>(newQueryable);
return newCombineQueryable
;
}
public async Task<List<TResult>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken())
public async Task<List<RouteQueryResult<TResult>>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken())
{
var tableResult = _mergeContext.GetRouteResults();
var tableResult = _mergeContext.RouteResults;
var enumeratorTasks = tableResult.Select(routeResult =>
{
return Task.Run(async () =>
@ -73,7 +89,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
try
{
var asyncExecuteQueryable = CreateAsyncExecuteQueryable<TResult>(routeResult);
return await efQuery(asyncExecuteQueryable);
var queryResult= await efQuery(asyncExecuteQueryable);
return new RouteQueryResult<TResult>(routeResult, queryResult);
//}
}
catch (Exception e)
@ -111,7 +128,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
}
public virtual IQueryable EFQueryAfterFilter<TResult>(IQueryable<TEntity> queryable)
public virtual IQueryable DoCombineQueryable<TResult>(IQueryable<TEntity> queryable)
{
return queryable;
}

View File

@ -17,7 +17,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
public interface IInMemoryAsyncMergeEngine<TEntity>
{
StreamMergeContext<TEntity> GetStreamMergeContext();
Task<List<TResult>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery,
Task<List<RouteQueryResult<TResult>>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery,
CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
@ -29,9 +30,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override bool MergeResult()
{
var result = base.Execute( queryable => ((IQueryable<TEntity>)queryable).Any());
return result.All(o => o);
return AsyncHelper.RunSync(()=> MergeResultAsync());
}
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())

View File

@ -1,10 +1,10 @@
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Helpers;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
@ -24,16 +24,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override int MergeResult()
{
var result = base.Execute( queryable => ((IQueryable<TEntity>)queryable).Count());
return result.Sum();
return AsyncHelper.RunSync(() => MergeResultAsync());
}
public override async Task<int> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).CountAsync(cancellationToken), cancellationToken);
return result.Sum();
return result.Sum(o=>o.QueryResult);
}
}

View File

@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using System.Text;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
namespace ShardingCore.Sharding.StreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/1 13:13:17
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class RouteQueryResult<TResult>
{
public RouteResult RouteResult { get; }
public TResult QueryResult { get; }
public RouteQueryResult(RouteResult routeResult,TResult queryResult)
{
RouteResult = routeResult;
QueryResult = queryResult;
}
}
}

View File

@ -15,7 +15,7 @@ namespace ShardingCore
/// </summary>
/// <param name="total">总记录数。</param>
/// <param name="data">当前页面的数据。</param>
public ShardingPagedResult(List<T> data, int total)
public ShardingPagedResult(List<T> data, long total)
{
this.Total = total;
this.Data = data;
@ -26,7 +26,7 @@ namespace ShardingCore
/// <summary>
/// 获取或设置总记录数。
/// </summary>
public int Total { get; set; }
public long Total { get; set; }
/// <summary>
/// 分页数据
/// </summary>