优化大量代码架构支持针对不需要聚合的数据由原生efcore接管[#83]

This commit is contained in:
xuejiaming 2021-12-19 22:30:48 +08:00
parent 357df30710
commit d6e5849837
65 changed files with 1030 additions and 508 deletions

View File

@ -64,6 +64,7 @@ namespace Sample.SqlServer.Controllers
resultx112331tt2xx.UserId = "xxxxx"; resultx112331tt2xx.UserId = "xxxxx";
var resultx112331 = await _defaultTableDbContext.Set<SysUserMod>().CountAsync(); var resultx112331 = await _defaultTableDbContext.Set<SysUserMod>().CountAsync();
var resultx11233411 = _defaultTableDbContext.Set<SysUserMod>().Count(); var resultx11233411 = _defaultTableDbContext.Set<SysUserMod>().Count();
var resultx11231xa = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age == 198).Select(o => o.Id).ContainsAsync("198");
var resultx11231 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age == 198198).Select(o => o.Id).ContainsAsync("1981"); var resultx11231 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age == 198198).Select(o => o.Id).ContainsAsync("1981");
var resultx1121 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").SumAsync(o => o.Age); var resultx1121 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").SumAsync(o => o.Age);
var resultx111 = await _defaultTableDbContext.Set<SysUserMod>().FirstOrDefaultAsync(o => o.Id == "198"); var resultx111 = await _defaultTableDbContext.Set<SysUserMod>().FirstOrDefaultAsync(o => o.Id == "198");

View File

@ -17,10 +17,10 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine
/// 分库路由上下文 /// 分库路由上下文
/// </summary> /// </summary>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
public class DataSourceRouteRuleContext<T> public class DataSourceRouteRuleContext
{ {
public ISet<Type> QueryEntities { get; } public ISet<Type> QueryEntities { get; }
public DataSourceRouteRuleContext(IQueryable<T> queryable,Type dbContextType) public DataSourceRouteRuleContext(IQueryable queryable,Type dbContextType)
{ {
Queryable = queryable; Queryable = queryable;
QueryEntities = queryable.ParseQueryableEntities(dbContextType); QueryEntities = queryable.ParseQueryableEntities(dbContextType);
@ -28,6 +28,6 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine
/// <summary> /// <summary>
/// 查询条件 /// 查询条件
/// </summary> /// </summary>
public IQueryable<T> Queryable { get; } public IQueryable Queryable { get; }
} }
} }

View File

@ -29,7 +29,7 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine
_virtualDataSource = virtualDataSource; _virtualDataSource = virtualDataSource;
_entityMetadataManager = entityMetadataManager; _entityMetadataManager = entityMetadataManager;
} }
public DataSourceRouteResult Route<T>(DataSourceRouteRuleContext<T> routeRuleContext) public DataSourceRouteResult Route(DataSourceRouteRuleContext routeRuleContext)
{ {
var dataSourceMaps = new Dictionary<Type, ISet<string>>(); var dataSourceMaps = new Dictionary<Type, ISet<string>>();
var notShardingDataSourceEntityType = routeRuleContext.QueryEntities.FirstOrDefault(o => !_entityMetadataManager.IsShardingDataSource(o)); var notShardingDataSourceEntityType = routeRuleContext.QueryEntities.FirstOrDefault(o => !_entityMetadataManager.IsShardingDataSource(o));

View File

@ -34,9 +34,9 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="queryable"></param> /// <param name="queryable"></param>
/// <returns></returns> /// <returns></returns>
public DataSourceRouteRuleContext<T> CreateContext<T>(IQueryable<T> queryable) public DataSourceRouteRuleContext CreateContext(IQueryable queryable)
{ {
return new DataSourceRouteRuleContext<T>(queryable,typeof(TShardingDbContext)); return new DataSourceRouteRuleContext(queryable,typeof(TShardingDbContext));
} }
/// <summary> /// <summary>
/// 路由到具体的物理数据源 /// 路由到具体的物理数据源
@ -44,9 +44,9 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="queryable"></param> /// <param name="queryable"></param>
/// <returns></returns> /// <returns></returns>
public DataSourceRouteResult Route<T>(IQueryable<T> queryable) public DataSourceRouteResult Route(IQueryable queryable)
{ {
var ruleContext = CreateContext<T>(queryable); var ruleContext = CreateContext(queryable);
return _dataSourceRouteRuleEngine.Route(ruleContext); return _dataSourceRouteRuleEngine.Route(ruleContext);
} }
/// <summary> /// <summary>
@ -55,7 +55,7 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="ruleContext"></param> /// <param name="ruleContext"></param>
/// <returns></returns> /// <returns></returns>
public DataSourceRouteResult Route<T>(DataSourceRouteRuleContext<T> ruleContext) public DataSourceRouteResult Route(DataSourceRouteRuleContext ruleContext)
{ {
return _dataSourceRouteRuleEngine.Route(ruleContext); return _dataSourceRouteRuleEngine.Route(ruleContext);
} }

View File

@ -15,7 +15,7 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine
*/ */
public interface IDataSourceRouteRuleEngine public interface IDataSourceRouteRuleEngine
{ {
DataSourceRouteResult Route<T>(DataSourceRouteRuleContext<T> routeRuleContext); DataSourceRouteResult Route(DataSourceRouteRuleContext routeRuleContext);
} }
public interface IDataSourceRouteRuleEngine<TShardingDbContext> : IDataSourceRouteRuleEngine public interface IDataSourceRouteRuleEngine<TShardingDbContext> : IDataSourceRouteRuleEngine

View File

@ -14,12 +14,14 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public interface IDataSourceRouteRuleEngineFactory
public interface IDataSourceRouteRuleEngineFactory<TShardingDbContext> {
DataSourceRouteRuleContext CreateContext(IQueryable queryable);
DataSourceRouteResult Route(IQueryable queryable);
DataSourceRouteResult Route(DataSourceRouteRuleContext ruleContext);
}
public interface IDataSourceRouteRuleEngineFactory<TShardingDbContext> : IDataSourceRouteRuleEngineFactory
where TShardingDbContext : DbContext, IShardingDbContext where TShardingDbContext : DbContext, IShardingDbContext
{ {
DataSourceRouteRuleContext<T> CreateContext<T>(IQueryable<T> queryable);
DataSourceRouteResult Route<T>(IQueryable<T> queryable);
DataSourceRouteResult Route<T>(DataSourceRouteRuleContext<T> ruleContext);
} }
} }

View File

@ -15,6 +15,6 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine
public interface ITableRouteRuleEngine<TShardingDbContext> public interface ITableRouteRuleEngine<TShardingDbContext>
where TShardingDbContext : DbContext, IShardingDbContext where TShardingDbContext : DbContext, IShardingDbContext
{ {
IEnumerable<TableRouteResult> Route<T>(TableRouteRuleContext<T> tableRouteRuleContext); IEnumerable<TableRouteResult> Route(TableRouteRuleContext tableRouteRuleContext);
} }
} }

View File

@ -12,11 +12,14 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine
* @Date: Thursday, 28 January 2021 13:30:28 * @Date: Thursday, 28 January 2021 13:30:28
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public interface ITableRouteRuleEngineFactory<TShardingDbContext> public interface ITableRouteRuleEngineFactory
{
TableRouteRuleContext CreateContext(IQueryable queryable);
IEnumerable<TableRouteResult> Route(IQueryable queryable);
IEnumerable<TableRouteResult> Route(TableRouteRuleContext ruleContext);
}
public interface ITableRouteRuleEngineFactory<TShardingDbContext> : ITableRouteRuleEngineFactory
where TShardingDbContext : DbContext, IShardingDbContext where TShardingDbContext : DbContext, IShardingDbContext
{ {
TableRouteRuleContext<T> CreateContext<T>(IQueryable<T> queryable);
IEnumerable<TableRouteResult> Route<T>(IQueryable<T> queryable);
IEnumerable<TableRouteResult> Route<T>(TableRouteRuleContext<T> ruleContext);
} }
} }

View File

@ -10,15 +10,15 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine
* @Date: Thursday, 28 January 2021 10:54:52 * @Date: Thursday, 28 January 2021 10:54:52
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class TableRouteRuleContext<T> public class TableRouteRuleContext
{ {
public TableRouteRuleContext(IQueryable<T> queryable) public TableRouteRuleContext(IQueryable queryable)
{ {
Queryable = queryable; Queryable = queryable;
} }
public IQueryable<T> Queryable { get; } public IQueryable Queryable { get; }
} }
} }

View File

@ -30,7 +30,7 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine
_entityMetadataManager = entityMetadataManager; _entityMetadataManager = entityMetadataManager;
} }
public IEnumerable<TableRouteResult> Route<T>(TableRouteRuleContext<T> tableRouteRuleContext) public IEnumerable<TableRouteResult> Route(TableRouteRuleContext tableRouteRuleContext)
{ {
Dictionary<IVirtualTable, ISet<IPhysicTable>> routeMaps = new Dictionary<IVirtualTable, ISet<IPhysicTable>>(); Dictionary<IVirtualTable, ISet<IPhysicTable>> routeMaps = new Dictionary<IVirtualTable, ISet<IPhysicTable>>();
var queryEntities = tableRouteRuleContext.Queryable.ParseQueryableEntities(typeof(TShardingDbContext)); var queryEntities = tableRouteRuleContext.Queryable.ParseQueryableEntities(typeof(TShardingDbContext));

View File

@ -29,21 +29,20 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine
/// 创建表路由上下文 /// 创建表路由上下文
/// </summary> /// </summary>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="dsname"></param>
/// <param name="queryable"></param> /// <param name="queryable"></param>
/// <returns></returns> /// <returns></returns>
public TableRouteRuleContext<T> CreateContext<T>(IQueryable<T> queryable) public TableRouteRuleContext CreateContext(IQueryable queryable)
{ {
return new TableRouteRuleContext<T>(queryable); return new TableRouteRuleContext(queryable);
} }
public IEnumerable<TableRouteResult> Route<T>(IQueryable<T> queryable) public IEnumerable<TableRouteResult> Route(IQueryable queryable)
{ {
var ruleContext = CreateContext<T>(queryable); var ruleContext = CreateContext(queryable);
return _tableRouteRuleEngine.Route(ruleContext); return Route(ruleContext);
} }
public IEnumerable<TableRouteResult> Route<T>(TableRouteRuleContext<T> ruleContext) public IEnumerable<TableRouteResult> Route(TableRouteRuleContext ruleContext)
{ {
return _tableRouteRuleEngine.Route(ruleContext); return _tableRouteRuleEngine.Route(ruleContext);
} }

View File

@ -115,6 +115,7 @@ namespace ShardingCore
services.TryAddSingleton(typeof(IParallelTableManager<>), typeof(ParallelTableManager<>)); services.TryAddSingleton(typeof(IParallelTableManager<>), typeof(ParallelTableManager<>));
services.TryAddSingleton<IRouteTailFactory, RouteTailFactory>(); services.TryAddSingleton<IRouteTailFactory, RouteTailFactory>();
services.TryAddSingleton<IShardingComplierExecutor, DefaultShardingComplierExecutor>(); services.TryAddSingleton<IShardingComplierExecutor, DefaultShardingComplierExecutor>();
services.TryAddSingleton<IQueryCompilerContextFactory, QueryCompilerContextFactory>();
services.TryAddSingleton<IShardingQueryExecutor, DefaultShardingQueryExecutor>(); services.TryAddSingleton<IShardingQueryExecutor, DefaultShardingQueryExecutor>();
services.TryAddSingleton<IReadWriteConnectorFactory, ReadWriteConnectorFactory>(); services.TryAddSingleton<IReadWriteConnectorFactory, ReadWriteConnectorFactory>();

View File

@ -69,7 +69,7 @@ namespace ShardingCore.Extensions
return expression.ToString(); return expression.ToString();
#endif #endif
} }
public static string ShardingPrint<T>(this IQueryable<T> queryable) public static string ShardingPrint(this IQueryable queryable)
{ {
return queryable.Expression.ShardingPrint(); return queryable.Expression.ShardingPrint();
} }

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.Abstractions
{
public interface IQueryCompilerContextFactory
{
IQueryCompilerContext Create<TResult>(IShardingDbContext shardingDbContext, Expression queryExpression,bool async);
}
}

View File

@ -3,6 +3,7 @@ using System.Threading;
using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.ShardingExecutors; using ShardingCore.Sharding.ShardingExecutors;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
#if EFCORE2 #if EFCORE2
using Microsoft.EntityFrameworkCore.Internal; using Microsoft.EntityFrameworkCore.Internal;
@ -22,16 +23,16 @@ namespace ShardingCore.Sharding.Abstractions
/// execute query /// execute query
/// </summary> /// </summary>
/// <typeparam name="TResult"></typeparam> /// <typeparam name="TResult"></typeparam>
/// <param name="queryCompilerContext"></param> /// <param name="mergeQueryCompilerContext"></param>
/// <returns></returns> /// <returns></returns>
TResult Execute<TResult>(QueryCompilerContext queryCompilerContext); TResult Execute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext);
/// <summary> /// <summary>
/// execute query async /// execute query async
/// </summary> /// </summary>
/// <typeparam name="TResult"></typeparam> /// <typeparam name="TResult"></typeparam>
/// <param name="queryCompilerContext"></param> /// <param name="mergeQueryCompilerContext"></param>
/// <param name="cancellationToken"></param> /// <param name="cancellationToken"></param>
/// <returns></returns> /// <returns></returns>
TResult ExecuteAsync<TResult>(QueryCompilerContext queryCompilerContext, CancellationToken cancellationToken = new CancellationToken()); TResult ExecuteAsync<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext, CancellationToken cancellationToken = new CancellationToken());
} }
} }

View File

@ -1,6 +1,7 @@
using System.Linq; using System.Linq;
using System.Linq.Expressions; using System.Linq.Expressions;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.Abstractions namespace ShardingCore.Sharding.Abstractions
{ {
@ -12,7 +13,7 @@ namespace ShardingCore.Sharding.Abstractions
*/ */
public interface IStreamMergeContextFactory public interface IStreamMergeContextFactory
{ {
StreamMergeContext<T> Create<T>(IQueryable<T> queryable, IShardingDbContext shardingDbContext); StreamMergeContext<T> Create<T>(IMergeQueryCompilerContext mergeQueryCompilerContext);
} }
public interface IStreamMergeContextFactory<TShardingDbContext> : IStreamMergeContextFactory where TShardingDbContext:DbContext,IShardingDbContext public interface IStreamMergeContextFactory<TShardingDbContext> : IStreamMergeContextFactory where TShardingDbContext:DbContext,IShardingDbContext

View File

@ -1,46 +0,0 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/19 8:08:50
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal abstract class AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
{
private readonly TEntity _constantItem;
protected AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
var secondExpression = GetSecondExpression();
if (!(secondExpression is ConstantExpression constantExpression))
{
throw new ShardingCoreException($"not found constant {methodCallExpression.ShardingPrint()}");
}
_constantItem = (TEntity)constantExpression.Value;
}
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression)
{
if (!(secondExpression is ConstantExpression))
{
throw new ShardingCoreInvalidOperationException(GetMethodCallExpression().ShardingPrint());
}
return queryable;
}
protected TEntity GetConstantItem()
{
return _constantItem;
}
}
}

View File

@ -15,7 +15,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.Abstract
*/ */
internal abstract class AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine<TEntity>, IEnsureMergeResult<TResult> internal abstract class AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine<TEntity>, IEnsureMergeResult<TResult>
{ {
protected AbstractEnsureMethodCallInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) protected AbstractEnsureMethodCallInMemoryAsyncMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }

View File

@ -4,6 +4,7 @@ using System.Linq.Expressions;
using ShardingCore.Exceptions; using ShardingCore.Exceptions;
using ShardingCore.Extensions; using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines
{ {
@ -17,28 +18,14 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.Abstract
internal abstract class AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine<TEntity,TResult,TSelect>: AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult> internal abstract class AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine<TEntity,TResult,TSelect>: AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
{ {
protected AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) protected AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }
public override IQueryable DoCombineQueryable<TResult1>(IQueryable<TEntity> queryable) public override IQueryable DoCombineQueryable<TResult1>(IQueryable<TEntity> queryable)
{ {
var secondExpression = GetSecondExpression(); var selectQueryCombineResult = (SelectQueryCombineResult)GetStreamMergeContext().MergeQueryCompilerContext.GetQueryCombineResult();
if (secondExpression != null) return selectQueryCombineResult.GetSelectCombineQueryable<TEntity, TSelect>(queryable);
{
if (secondExpression is UnaryExpression unaryExpression && unaryExpression.Operand is LambdaExpression lambdaExpression && lambdaExpression is Expression<Func<TEntity, TSelect>> selector)
{
return queryable.Select(selector);
}
throw new ShardingCoreException($"expression is not selector:{secondExpression.ShardingPrint()}");
}
return queryable;
}
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> _queryable, Expression secondExpression)
{
return _queryable;
} }
} }

View File

@ -1,35 +0,0 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/19 8:22:19
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal abstract class AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
{
public AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression)
{
if (secondExpression is UnaryExpression where && where.Operand is LambdaExpression lambdaExpression && lambdaExpression is Expression<Func<TEntity, bool>> predicate)
{
return queryable.Where(predicate);
}
throw new ShardingCoreInvalidOperationException(GetMethodCallExpression().ShardingPrint());
}
}
}

View File

@ -17,7 +17,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.Abstract
{ {
protected AbstractGenericMethodCallInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) protected AbstractGenericMethodCallInMemoryAsyncMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }

View File

@ -4,6 +4,7 @@ using System.Linq.Expressions;
using ShardingCore.Exceptions; using ShardingCore.Exceptions;
using ShardingCore.Extensions; using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines
{ {
@ -16,28 +17,14 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.Abstract
*/ */
internal abstract class AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine<TEntity,TSelect>:AbstractGenericMethodCallInMemoryAsyncMergeEngine<TEntity> internal abstract class AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine<TEntity,TSelect>:AbstractGenericMethodCallInMemoryAsyncMergeEngine<TEntity>
{ {
public AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }
public override IQueryable DoCombineQueryable<TResult>(IQueryable<TEntity> queryable) public override IQueryable DoCombineQueryable<TResult>(IQueryable<TEntity> queryable)
{ {
var secondExpression = GetSecondExpression(); var selectQueryCombineResult = (SelectQueryCombineResult)GetStreamMergeContext().MergeQueryCompilerContext.GetQueryCombineResult();
if (secondExpression != null) return selectQueryCombineResult.GetSelectCombineQueryable<TEntity, TSelect>(queryable);
{
if (secondExpression is UnaryExpression unaryExpression && unaryExpression.Operand is LambdaExpression lambdaExpression && lambdaExpression is Expression<Func<TEntity, TSelect>> selector)
{
return queryable.Select(selector);
}
throw new ShardingCoreException($"expression is not selector:{secondExpression.ShardingPrint()}");
}
return queryable;
}
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression)
{
return queryable;
} }
} }

View File

@ -1,37 +0,0 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/19 8:42:02
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal abstract class AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity>: AbstractGenericMethodCallInMemoryAsyncMergeEngine<TEntity>
{
public AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression)
{
if (secondExpression is UnaryExpression where)
{
if (@where.Operand is LambdaExpression lambdaExpression and Expression<Func<TEntity, bool>> predicate)
{
return queryable.Where(predicate);
}
}
throw new ShardingCoreInvalidOperationException(GetMethodCallExpression().ShardingPrint());
}
}
}

View File

@ -14,9 +14,9 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.Abstract
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
internal abstract class AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext,TEntity> : AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity> where TShardingDbContext:DbContext,IShardingDbContext internal abstract class AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext,TEntity> : AbstractGenericMethodCallInMemoryAsyncMergeEngine<TEntity> where TShardingDbContext:DbContext,IShardingDbContext
{ {
protected AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) protected AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }

View File

@ -26,52 +26,12 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
*/ */
internal abstract class AbstractInMemoryAsyncMergeEngine<TEntity> : AbstractBaseMergeEngine<TEntity>, IInMemoryAsyncMergeEngine<TEntity> internal abstract class AbstractInMemoryAsyncMergeEngine<TEntity> : AbstractBaseMergeEngine<TEntity>, IInMemoryAsyncMergeEngine<TEntity>
{ {
private readonly MethodCallExpression _methodCallExpression;
private readonly StreamMergeContext<TEntity> _mergeContext; private readonly StreamMergeContext<TEntity> _mergeContext;
private readonly IQueryable<TEntity> _queryable;
private readonly Expression _secondExpression;
public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) public AbstractInMemoryAsyncMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
{ {
_methodCallExpression = methodCallExpression; _mergeContext = streamMergeContext;
if (methodCallExpression.Arguments.Count < 1 || methodCallExpression.Arguments.Count > 2)
throw new ArgumentException($"argument count must 1 or 2 :[{methodCallExpression.ShardingPrint()}]");
for (int i = 0; i < methodCallExpression.Arguments.Count; i++)
{
var expression = methodCallExpression.Arguments[i];
if (typeof(IQueryable).IsAssignableFrom(expression.Type))
{
if (_queryable != null)
throw new ArgumentException(
$"argument found more 1 IQueryable :[{methodCallExpression.ShardingPrint()}]");
_queryable = new EnumerableQuery<TEntity>(expression);
} }
else
{
_secondExpression = expression;
}
}
if (_queryable == null)
throw new ArgumentException($"argument not found IQueryable :[{methodCallExpression.ShardingPrint()}]");
if (methodCallExpression.Arguments.Count == 2)
{
if (_secondExpression == null)
throw new ShardingCoreInvalidOperationException(methodCallExpression.ShardingPrint());
// ReSharper disable once VirtualMemberCallInConstructor
_queryable = CombineQueryable(_queryable, _secondExpression);
}
_mergeContext = ((IStreamMergeContextFactory)ShardingContainer.GetService(typeof(IStreamMergeContextFactory<>).GetGenericType0(shardingDbContext.GetType()))).Create(_queryable, shardingDbContext);
}
/// <summary>
/// 合并queryable
/// </summary>
/// <param name="queryable"></param>
/// <param name="secondExpression"></param>
/// <returns></returns>
protected abstract IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression);
private (IQueryable queryable, DbContext dbContext) CreateAsyncExecuteQueryable<TResult>(string dsname, TableRouteResult tableRouteResult, ConnectionModeEnum connectionMode) private (IQueryable queryable, DbContext dbContext) CreateAsyncExecuteQueryable<TResult>(string dsname, TableRouteResult tableRouteResult, ConnectionModeEnum connectionMode)
{ {
@ -133,20 +93,5 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
{ {
return _mergeContext; return _mergeContext;
} }
//public IQueryable<TEntity> GetQueryable()
//{
// return _queryable;
//}
protected MethodCallExpression GetMethodCallExpression()
{
return _methodCallExpression;
}
protected Expression GetSecondExpression()
{
return _secondExpression;
}
} }
} }

View File

@ -24,8 +24,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
internal class AverageAsyncInMemoryMergeEngine<TEntity, TEnsureResult,TSelect> : internal class AverageAsyncInMemoryMergeEngine<TEntity, TEnsureResult,TSelect> :
AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine<TEntity, TEnsureResult, TSelect> AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine<TEntity, TEnsureResult, TSelect>
{ {
public AverageAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, public AverageAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{ {
} }
@ -53,7 +52,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var resultType = typeof(T); var resultType = typeof(T);
if (!resultType.IsNumericType()) if (!resultType.IsNumericType())
throw new ShardingCoreException( throw new ShardingCoreException(
$"not support {GetMethodCallExpression().ShardingPrint()} result {resultType}"); $"not support {GetStreamMergeContext().MergeQueryCompilerContext.GetQueryExpression().ShardingPrint()} result {resultType}");
#if !EFCORE2 #if !EFCORE2
return await ShardingEntityFrameworkQueryableExtensions.ExecuteAsync<T, Task<T>>(ShardingQueryableMethods.GetSumWithoutSelector(resultType), (IQueryable<T>)queryable, (Expression)null, cancellationToken); return await ShardingEntityFrameworkQueryableExtensions.ExecuteAsync<T, Task<T>>(ShardingQueryableMethods.GetSumWithoutSelector(resultType), (IQueryable<T>)queryable, (Expression)null, cancellationToken);
#endif #endif
@ -69,7 +68,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
if (!typeof(TSelect).IsNumericType()) if (!typeof(TSelect).IsNumericType())
{ {
throw new ShardingCoreException( throw new ShardingCoreException(
$"not support {GetMethodCallExpression().ShardingPrint()} result {typeof(TSelect)}"); $"not support {GetStreamMergeContext().MergeQueryCompilerContext.GetQueryExpression().ShardingPrint()} result {typeof(TSelect)}");
} }
var result = await AggregateAverageResultAsync<TSelect>(cancellationToken); var result = await AggregateAverageResultAsync<TSelect>(cancellationToken);
if (result.IsEmpty()) if (result.IsEmpty())

View File

@ -21,7 +21,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
*/ */
internal class MaxAsyncInMemoryMergeEngine<TEntity, TSelect> : AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine<TEntity, TSelect> internal class MaxAsyncInMemoryMergeEngine<TEntity, TSelect> : AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine<TEntity, TSelect>
{ {
public MaxAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public MaxAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }
private TResult GetMaxTResult<TInnerSelect, TResult>(List<RouteQueryResult<TInnerSelect>> source) private TResult GetMaxTResult<TInnerSelect, TResult>(List<RouteQueryResult<TInnerSelect>> source)

View File

@ -21,7 +21,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
*/ */
internal class MinAsyncInMemoryMergeEngine<TEntity, TSelect> : AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine<TEntity, TSelect> internal class MinAsyncInMemoryMergeEngine<TEntity, TSelect> : AbstractGenericMethodCallSelectorInMemoryAsyncMergeEngine<TEntity, TSelect>
{ {
public MinAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public MinAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }

View File

@ -21,7 +21,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
*/ */
internal class SumAsyncInMemoryMergeEngine<TEntity, TEnsureResult> : AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine<TEntity, TEnsureResult,TEnsureResult> internal class SumAsyncInMemoryMergeEngine<TEntity, TEnsureResult> : AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine<TEntity, TEnsureResult,TEnsureResult>
{ {
public SumAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public SumAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }
@ -30,7 +30,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var resultType = typeof(TEnsureResult); var resultType = typeof(TEnsureResult);
if(!resultType.IsNumericType()) if(!resultType.IsNumericType())
throw new ShardingCoreException( throw new ShardingCoreException(
$"not support {GetMethodCallExpression().ShardingPrint()} result {resultType}"); $"not support {GetStreamMergeContext().MergeQueryCompilerContext.GetQueryExpression().ShardingPrint()} result {resultType}");
#if !EFCORE2 #if !EFCORE2
var result = await base.ExecuteAsync(queryable => ShardingEntityFrameworkQueryableExtensions.ExecuteAsync<TEnsureResult, Task<TEnsureResult>>(ShardingQueryableMethods.GetSumWithoutSelector(resultType), (IQueryable<TEnsureResult>)queryable, (Expression)null, cancellationToken), cancellationToken); var result = await base.ExecuteAsync(queryable => ShardingEntityFrameworkQueryableExtensions.ExecuteAsync<TEnsureResult, Task<TEnsureResult>>(ShardingQueryableMethods.GetSumWithoutSelector(resultType), (IQueryable<TEnsureResult>)queryable, (Expression)null, cancellationToken), cancellationToken);
return GetSumResult(result); return GetSumResult(result);

View File

@ -6,6 +6,7 @@ using System.Linq;
using System.Linq.Expressions; using System.Linq.Expressions;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.StreamMergeEngines namespace ShardingCore.Sharding.StreamMergeEngines
{ {
@ -18,27 +19,26 @@ namespace ShardingCore.Sharding.StreamMergeEngines
*/ */
internal class AllAsyncInMemoryMergeEngine<TEntity> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, bool> internal class AllAsyncInMemoryMergeEngine<TEntity> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, bool>
{ {
public AllAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public AllAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{ {
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).AllAsync(_predicate, cancellationToken), cancellationToken); var result = await base.ExecuteAsync(queryable =>
{
var allQueryCombineResult = (AllQueryCombineResult)GetStreamMergeContext().MergeQueryCompilerContext.GetQueryCombineResult();
Expression<Func<TEntity, bool>> allPredicate = x => true;
var predicate = allQueryCombineResult.GetAllPredicate();
if (predicate != null)
{
allPredicate = (Expression<Func<TEntity, bool>>)predicate;
}
return ((IQueryable<TEntity>)queryable).AllAsync(allPredicate, cancellationToken);
}, cancellationToken);
return result.All(o => o.QueryResult); return result.All(o => o.QueryResult);
} }
private Expression<Func<TEntity, bool>> _predicate;
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression)
{
if (secondExpression is UnaryExpression where && where.Operand is LambdaExpression lambdaExpression && lambdaExpression is Expression<Func<TEntity, bool>> predicate)
{
_predicate=predicate;
}
return queryable;
}
} }
} }

View File

@ -15,9 +15,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
internal class AnyAsyncInMemoryMergeEngine<TEntity> : AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine<TEntity,bool> internal class AnyAsyncInMemoryMergeEngine<TEntity> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity,bool>
{ {
public AnyAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public AnyAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }

View File

@ -5,6 +5,7 @@ using System.Linq;
using System.Linq.Expressions; using System.Linq.Expressions;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.StreamMergeEngines namespace ShardingCore.Sharding.StreamMergeEngines
{ {
@ -15,16 +16,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
internal class ContainsAsyncInMemoryMergeEngine<TEntity>: AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine<TEntity,bool> internal class ContainsAsyncInMemoryMergeEngine<TEntity>: AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity,bool>
{ {
public ContainsAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public ContainsAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{ {
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).ContainsAsync(GetConstantItem(), cancellationToken), cancellationToken); var result = await base.ExecuteAsync(queryable =>
{
var constantQueryCombineResult = (ConstantQueryCombineResult)GetStreamMergeContext().MergeQueryCompilerContext.GetQueryCombineResult();
var constantItem = (TEntity)constantQueryCombineResult.GetConstantItem();
return ((IQueryable<TEntity>)queryable).ContainsAsync(constantItem, cancellationToken);
}, cancellationToken);
return result.Any(o => o.QueryResult); return result.Any(o => o.QueryResult);
} }

View File

@ -16,10 +16,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
internal class CountAsyncInMemoryMergeEngine<TEntity> : AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine<TEntity,int> internal class CountAsyncInMemoryMergeEngine<TEntity> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity,int>
{ {
private readonly IShardingPageManager _shardingPageManager; private readonly IShardingPageManager _shardingPageManager;
public CountAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public CountAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>(); _shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
} }
@ -27,7 +27,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override async Task<int> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) public override async Task<int> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{ {
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).CountAsync(cancellationToken), cancellationToken); var result = await base.ExecuteAsync(queryable =>((IQueryable<TEntity>)queryable).CountAsync(cancellationToken), cancellationToken);
if (_shardingPageManager.Current != null) if (_shardingPageManager.Current != null)
{ {

View File

@ -19,7 +19,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
*/ */
internal class FirstAsyncInMemoryMergeEngine<TShardingDbContext,TEntity>: AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext,TEntity> where TShardingDbContext:DbContext,IShardingDbContext internal class FirstAsyncInMemoryMergeEngine<TShardingDbContext,TEntity>: AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext,TEntity> where TShardingDbContext:DbContext,IShardingDbContext
{ {
public FirstAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public FirstAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }

View File

@ -18,7 +18,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
*/ */
internal class FirstOrDefaultAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext internal class FirstOrDefaultAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext
{ {
public FirstOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public FirstOrDefaultAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }
public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken()) public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())

View File

@ -19,7 +19,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
*/ */
internal class LastAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext internal class LastAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext
{ {
public LastAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public LastAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }
public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken()) public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())

View File

@ -18,7 +18,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
*/ */
internal class LastOrDefaultAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext internal class LastOrDefaultAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext
{ {
public LastOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public LastOrDefaultAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }

View File

@ -16,10 +16,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
internal class LongCountAsyncInMemoryMergeEngine<TEntity> : AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine<TEntity,long> internal class LongCountAsyncInMemoryMergeEngine<TEntity> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity,long>
{ {
private readonly IShardingPageManager _shardingPageManager; private readonly IShardingPageManager _shardingPageManager;
public LongCountAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public LongCountAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
_shardingPageManager= ShardingContainer.GetService<IShardingPageManager>(); _shardingPageManager= ShardingContainer.GetService<IShardingPageManager>();
} }

View File

@ -18,7 +18,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
*/ */
internal class SingleAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext internal class SingleAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext
{ {
public SingleAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public SingleAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }

View File

@ -19,7 +19,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
*/ */
internal class SingleOrDefaultAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext internal class SingleOrDefaultAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext
{ {
public SingleOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public SingleOrDefaultAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{ {
} }

View File

@ -0,0 +1,66 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.ShardingExecutors.Abstractions
{
public abstract class AbstractQueryableCombine:IQueryableCombine
{
public QueryCombineResult Combine(IQueryCompilerContext queryCompilerContext, Type queryEntityType)
{
if (!(queryCompilerContext.GetQueryExpression() is MethodCallExpression methodCallExpression))
throw new InvalidOperationException($"{nameof(queryCompilerContext)}'s is not {nameof(MethodCallExpression)}");
if (methodCallExpression.Arguments.Count < 1 || methodCallExpression.Arguments.Count > 2)
throw new ArgumentException($"argument count must 1 or 2 :[{methodCallExpression.ShardingPrint()}]");
IQueryable queryable = null;
Expression secondExpression = null;
for (int i = 0; i < methodCallExpression.Arguments.Count; i++)
{
var expression = methodCallExpression.Arguments[i];
if (typeof(IQueryable).IsAssignableFrom(expression.Type))
{
if (queryable != null)
throw new ArgumentException(
$"argument found more one IQueryable :[{methodCallExpression.ShardingPrint()}]");
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
queryable = (IQueryable)Activator.CreateInstance(type, expression);
}
else
{
secondExpression = expression;
}
}
if (queryable == null)
throw new ArgumentException($"argument not found IQueryable :[{methodCallExpression.ShardingPrint()}]");
if (methodCallExpression.Arguments.Count == 2)
{
if (secondExpression == null)
throw new ShardingCoreInvalidOperationException(methodCallExpression.ShardingPrint());
// ReSharper disable once VirtualMemberCallInConstructor
queryable = DoCombineQueryable(queryable, secondExpression, queryCompilerContext);
}
return GetDefaultQueryCombineResult(queryable, secondExpression, queryCompilerContext);
}
public virtual QueryCombineResult GetDefaultQueryCombineResult(IQueryable queryable, Expression secondExpression, IQueryCompilerContext queryCompilerContext)
{
return new QueryCombineResult(queryable,queryCompilerContext);
}
public abstract IQueryable DoCombineQueryable(IQueryable queryable,Expression secondExpression, IQueryCompilerContext queryCompilerContext);
public Type GetQueryEntityType()
{
throw new NotImplementedException();
}
}
}

View File

@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.ShardingExecutors.Abstractions
{
public interface IMergeQueryCompilerContext : IQueryCompilerContext
{
QueryCombineResult GetQueryCombineResult();
Type GetQueryEntityType();
IEnumerable<TableRouteResult> GetTableRouteResults();
DataSourceRouteResult GetDataSourceRouteResult();
bool IsMergeQuery();
bool IsCrossTable();
bool IsCrossDataSource();
bool IsEnumerableQuery();
}
}

View File

@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Query.Internal;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.Abstractions
{
public interface IQueryCompilerContext
{
ISet<Type> GetQueryEntities();
IShardingDbContext GetShardingDbContext();
Expression GetQueryExpression();
IEntityMetadataManager GetEntityMetadataManager();
Type GetShardingDbContextType();
QueryCompilerExecutor GetQueryCompilerExecutor();
}
}

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.ShardingExecutors.Abstractions
{
public interface IQueryableCombine
{
QueryCombineResult Combine(IQueryCompilerContext queryCompilerContext,Type queryEntityType);
}
}

View File

@ -7,8 +7,10 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Internal; using Microsoft.EntityFrameworkCore.Internal;
using ShardingCore.Exceptions;
using ShardingCore.Extensions; using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
using ShardingCore.Utils; using ShardingCore.Utils;
namespace ShardingCore.Sharding.ShardingExecutors namespace ShardingCore.Sharding.ShardingExecutors
@ -16,20 +18,26 @@ namespace ShardingCore.Sharding.ShardingExecutors
public class DefaultShardingComplierExecutor: IShardingComplierExecutor public class DefaultShardingComplierExecutor: IShardingComplierExecutor
{ {
private readonly IShardingQueryExecutor _shardingQueryExecutor; private readonly IShardingQueryExecutor _shardingQueryExecutor;
private readonly IQueryCompilerContextFactory _queryCompilerContextFactory;
public DefaultShardingComplierExecutor(IShardingQueryExecutor shardingQueryExecutor) public DefaultShardingComplierExecutor(IShardingQueryExecutor shardingQueryExecutor, IQueryCompilerContextFactory queryCompilerContextFactory)
{ {
_shardingQueryExecutor = shardingQueryExecutor; _shardingQueryExecutor = shardingQueryExecutor;
_queryCompilerContextFactory = queryCompilerContextFactory;
} }
public TResult Execute<TResult>(IShardingDbContext shardingDbContext, Expression query) public TResult Execute<TResult>(IShardingDbContext shardingDbContext, Expression query)
{ {
var queryCompilerContext = QueryCompilerContext.Create(shardingDbContext, query); var queryCompilerContext = _queryCompilerContextFactory.Create<TResult>(shardingDbContext, query, false);
var queryCompilerIfNoShardingQuery = queryCompilerContext.GetQueryCompiler(); var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor();
if (queryCompilerIfNoShardingQuery != null) if (queryCompilerExecutor != null)
{ {
return queryCompilerIfNoShardingQuery.Execute<TResult>(queryCompilerContext.NewQueryExpression()); return queryCompilerExecutor.GetQueryCompiler().Execute<TResult>(queryCompilerExecutor.GetReplaceQueryExpression());
} }
return _shardingQueryExecutor.Execute<TResult>(queryCompilerContext);
if (!(queryCompilerContext is IMergeQueryCompilerContext mergeCompilerContext))
throw new ShardingCoreInvalidOperationException(
$"{nameof(queryCompilerContext)} is not {nameof(IMergeQueryCompilerContext)}");
return _shardingQueryExecutor.Execute<TResult>(mergeCompilerContext);
} }
#if !EFCORE2 #if !EFCORE2
@ -37,38 +45,47 @@ namespace ShardingCore.Sharding.ShardingExecutors
public TResult ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query, public TResult ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query,
CancellationToken cancellationToken = new CancellationToken()) CancellationToken cancellationToken = new CancellationToken())
{ {
var queryCompilerContext = QueryCompilerContext.Create(shardingDbContext, query); var queryCompilerContext = _queryCompilerContextFactory.Create<TResult>(shardingDbContext, query, true);
var queryCompilerIfNoShardingQuery = queryCompilerContext.GetQueryCompiler(); var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor();
if (queryCompilerIfNoShardingQuery != null) if (queryCompilerExecutor != null)
{ {
return queryCompilerIfNoShardingQuery.ExecuteAsync<TResult>(queryCompilerContext.NewQueryExpression(), cancellationToken); return queryCompilerExecutor.GetQueryCompiler().ExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression(), cancellationToken);
} }
return _shardingQueryExecutor.ExecuteAsync<TResult>(queryCompilerContext, cancellationToken); if (!(queryCompilerContext is IMergeQueryCompilerContext mergeCompilerContext))
throw new ShardingCoreInvalidOperationException(
$"{nameof(queryCompilerContext)} is not {nameof(IMergeQueryCompilerContext)}");
return _shardingQueryExecutor.ExecuteAsync<TResult>(mergeCompilerContext, cancellationToken);
} }
#endif #endif
#if EFCORE2 #if EFCORE2
public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query) public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query)
{ {
var queryCompilerContext = QueryCompilerContext.Create(shardingDbContext, query); var queryCompilerContext = _queryCompilerContextFactory.Create<IAsyncEnumerable<TResult>>(shardingDbContext, query, true);
var queryCompilerIfNoShardingQuery = queryCompilerContext.GetQueryCompiler(); var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor();
if (queryCompilerIfNoShardingQuery != null) if (queryCompilerExecutor != null)
{ {
return queryCompilerIfNoShardingQuery.ExecuteAsync<TResult>(queryCompilerContext.NewQueryExpression()); return queryCompilerExecutor.GetQueryCompiler().ExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression());
} }
return _shardingQueryExecutor.ExecuteAsync<IAsyncEnumerable<TResult>>(queryCompilerContext); if (!(queryCompilerContext is IMergeQueryCompilerContext mergeCompilerContext))
throw new ShardingCoreInvalidOperationException(
$"{nameof(queryCompilerContext)} is not {nameof(IMergeQueryCompilerContext)}");
return _shardingQueryExecutor.ExecuteAsync<IAsyncEnumerable<TResult>>(mergeCompilerContext);
} }
public Task<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query, public Task<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
var queryCompilerContext = QueryCompilerContext.Create(shardingDbContext, query); var queryCompilerContext = _queryCompilerContextFactory.Create<Task<TResult>>(shardingDbContext, query, true);
var queryCompilerIfNoShardingQuery = queryCompilerContext.GetQueryCompiler(); var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor();
if (queryCompilerIfNoShardingQuery != null) if (queryCompilerExecutor != null)
{ {
return queryCompilerIfNoShardingQuery.ExecuteAsync<TResult>(queryCompilerContext.NewQueryExpression(), cancellationToken); return queryCompilerExecutor.GetQueryCompiler().ExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression(), cancellationToken);
} }
return _shardingQueryExecutor.ExecuteAsync<Task<TResult>>(queryCompilerContext, cancellationToken); if (!(queryCompilerContext is IMergeQueryCompilerContext mergeCompilerContext))
throw new ShardingCoreInvalidOperationException(
$"{nameof(queryCompilerContext)} is not {nameof(IMergeQueryCompilerContext)}");
return _shardingQueryExecutor.ExecuteAsync<Task<TResult>>(mergeCompilerContext, cancellationToken);
} }
#endif #endif
} }

View File

@ -12,6 +12,7 @@ using System.Linq;
using System.Linq.Expressions; using System.Linq.Expressions;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
#if EFCORE2 #if EFCORE2
using Microsoft.EntityFrameworkCore.Internal; using Microsoft.EntityFrameworkCore.Internal;
#endif #endif
@ -28,149 +29,112 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
public class DefaultShardingQueryExecutor : IShardingQueryExecutor public class DefaultShardingQueryExecutor : IShardingQueryExecutor
{ {
public TResult Execute<TResult>(QueryCompilerContext queryCompilerContext) public TResult Execute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext)
{ {
//如果根表达式为iqueryable表示需要迭代 //如果根表达式为iqueryable表示需要迭代
if (queryCompilerContext.QueryExpression.Type.HasImplementedRawGeneric(typeof(IQueryable<>))) if (mergeQueryCompilerContext.IsEnumerableQuery())
{ {
return EnumerableExecute<TResult>(queryCompilerContext, false); return EnumerableExecute<TResult>(mergeQueryCompilerContext);
} }
return DoExecute<TResult>(queryCompilerContext, false, default); return DoExecute<TResult>(mergeQueryCompilerContext, false, default);
} }
public TResult ExecuteAsync<TResult>(QueryCompilerContext queryCompilerContext, CancellationToken cancellationToken = new CancellationToken()) public TResult ExecuteAsync<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext, CancellationToken cancellationToken = new CancellationToken())
{ {
if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>))) if (mergeQueryCompilerContext.IsEnumerableQuery())
{ {
return EnumerableExecute<TResult>(mergeQueryCompilerContext);
return EnumerableExecute<TResult>(queryCompilerContext, true);
} }
if (typeof(TResult).HasImplementedRawGeneric(typeof(Task<>))) if (typeof(TResult).HasImplementedRawGeneric(typeof(Task<>)))
{ {
return DoExecute<TResult>(queryCompilerContext, true, default); return DoExecute<TResult>(mergeQueryCompilerContext, true, default);
} }
throw new ShardingCoreException($"db context operator not support query expression:[{queryCompilerContext.QueryExpression.ShardingPrint()}] result type:[{typeof(TResult).FullName}]"); throw new ShardingCoreException($"db context operator not support query expression:[{mergeQueryCompilerContext.GetQueryExpression().ShardingPrint()}] result type:[{typeof(TResult).FullName}]");
} }
private TResult DoExecute<TResult>(QueryCompilerContext queryCompilerContext, bool async, CancellationToken cancellationToken = new CancellationToken()) private TResult DoExecute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken = new CancellationToken())
{ {
if (queryCompilerContext.QueryExpression is MethodCallExpression methodCallExpression) if (mergeQueryCompilerContext.GetQueryExpression() is MethodCallExpression methodCallExpression)
{ {
switch (methodCallExpression.Method.Name) switch (methodCallExpression.Method.Name)
{ {
case nameof(Enumerable.First): case nameof(Enumerable.First):
return GenericShardingDbContextMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<,>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.FirstOrDefault): case nameof(Enumerable.FirstOrDefault):
return GenericShardingDbContextMergeExecute<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<,>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.Last): case nameof(Enumerable.Last):
return GenericShardingDbContextMergeExecute<TResult>(typeof(LastAsyncInMemoryMergeEngine<,>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(LastAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.LastOrDefault): case nameof(Enumerable.LastOrDefault):
return GenericShardingDbContextMergeExecute<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<,>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.Single): case nameof(Enumerable.Single):
return GenericShardingDbContextMergeExecute<TResult>(typeof(SingleAsyncInMemoryMergeEngine<,>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(SingleAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.SingleOrDefault): case nameof(Enumerable.SingleOrDefault):
return GenericShardingDbContextMergeExecute<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<,>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.Count): case nameof(Enumerable.Count):
return EnsureMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return EnsureMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.LongCount): case nameof(Enumerable.LongCount):
return EnsureMergeExecute<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return EnsureMergeExecute<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.Any): case nameof(Enumerable.Any):
return EnsureMergeExecute<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return EnsureMergeExecute<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.All): case nameof(Enumerable.All):
return EnsureMergeExecute<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return EnsureMergeExecute<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.Max): case nameof(Enumerable.Max):
return GenericMergeExecute2<TResult>(typeof(MaxAsyncInMemoryMergeEngine<,>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return GenericMergeExecute2<TResult>(typeof(MaxAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.Min): case nameof(Enumerable.Min):
return GenericMergeExecute2<TResult>(typeof(MinAsyncInMemoryMergeEngine<,>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return GenericMergeExecute2<TResult>(typeof(MinAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.Sum): case nameof(Enumerable.Sum):
return EnsureMergeExecute2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return EnsureMergeExecute2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.Average): case nameof(Enumerable.Average):
return EnsureMergeExecute3<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,,>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return EnsureMergeExecute3<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,,>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.Contains): case nameof(Enumerable.Contains):
return EnsureMergeExecute<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), queryCompilerContext.ShardingDbContext, methodCallExpression, async, cancellationToken); return EnsureMergeExecute<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
} }
} }
throw new ShardingCoreException($"db context operator not support query expression:[{queryCompilerContext.QueryExpression.ShardingPrint()}] result type:[{typeof(TResult).FullName}]"); throw new ShardingCoreException($"db context operator not support query expression:[{mergeQueryCompilerContext.GetQueryExpression().ShardingPrint()}] result type:[{typeof(TResult).FullName}]");
} }
private TResult EnumerableExecute<TResult>(QueryCompilerContext queryCompilerContext, bool async)
private object GetStreamMergeContext(IMergeQueryCompilerContext mergeQueryCompilerContext)
{ {
Type queryEntityType; var queryable = mergeQueryCompilerContext.GetQueryCombineResult().GetCombineQueryable();
if (async)
queryEntityType = typeof(TResult).GetGenericArguments()[0];
else
{
queryEntityType = queryCompilerContext.QueryExpression.Type.GetSequenceType();
}
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, queryCompilerContext.QueryExpression);
var streamMergeContextFactory = (IStreamMergeContextFactory)ShardingContainer.GetService(typeof(IStreamMergeContextFactory<>).GetGenericType0(queryCompilerContext.ShardingDbContextType)); Type queryEntityType = mergeQueryCompilerContext.GetQueryEntityType();
// private readonly IStreamMergeContextFactory _streamMergeContextFactory;
var streamMergeContextFactory = (IStreamMergeContextFactory)ShardingContainer.GetService(typeof(IStreamMergeContextFactory<>).GetGenericType0(mergeQueryCompilerContext.GetShardingDbContextType()));
var streamMergeContextMethod = streamMergeContextFactory.GetType().GetMethod(nameof(IStreamMergeContextFactory.Create)); var streamMergeContextMethod = streamMergeContextFactory.GetType().GetMethod(nameof(IStreamMergeContextFactory.Create));
if (streamMergeContextMethod == null) if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]"); throw new ShardingCoreException($"cant found IStreamMergeContextFactory method [{nameof(IStreamMergeContextFactory.Create)}]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamMergeContextFactory, new[] { queryable, queryCompilerContext.ShardingDbContext }); return streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamMergeContextFactory, new object[] { mergeQueryCompilerContext });
}
private TResult EnumerableExecute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext)
{
Type queryEntityType = mergeQueryCompilerContext.GetQueryEntityType();
var streamMergeContext = GetStreamMergeContext(mergeQueryCompilerContext);
Type streamMergeEngineType = typeof(AsyncEnumeratorStreamMergeEngine<,>); Type streamMergeEngineType = typeof(AsyncEnumeratorStreamMergeEngine<,>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryCompilerContext.ShardingDbContextType, queryEntityType); streamMergeEngineType = streamMergeEngineType.MakeGenericType(mergeQueryCompilerContext.GetShardingDbContextType(), queryEntityType);
return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext); return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
} }
private TResult GenericShardingDbContextMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) private TResult GenericShardingDbContextMergeExecute<TResult>(Type streamMergeEngineType, IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken)
{ {
//{
// var startNew1 = Stopwatch.StartNew(); var queryEntityType = mergeQueryCompilerContext.GetQueryEntityType();
// var queryEntityType = query.GetQueryEntityType(); var streamMergeContext = GetStreamMergeContext(mergeQueryCompilerContext);
// var newStreamMergeEngineType = streamMergeEngineType.MakeGenericType(shardingDbContext.GetType(), queryEntityType); var newStreamMergeEngineType = streamMergeEngineType.MakeGenericType(mergeQueryCompilerContext.GetShardingDbContextType(), queryEntityType);
var streamEngine = Activator.CreateInstance(newStreamMergeEngineType, streamMergeContext);
// //{
// // //获取所有需要路由的表后缀
// // var startNew = Stopwatch.StartNew();
// // for (int i = 0; i < 10000; i++)
// // {
// // var streamEngine = ShardingCreatorHelper.CreateInstance(newStreamMergeEngineType, query, shardingDbContext);
// // }
// // startNew.Stop();
// // var x = startNew.ElapsedMilliseconds;
// //}
// {
// for (int i = 0; i < 10; i++)
// {
// var streamEngine = Activator.CreateInstance(typeof(AAA), shardingDbContext, query);
// }
// }
// var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult);
// var streamEngineMethod = newStreamMergeEngineType.GetMethod(methodName);
// if (streamEngineMethod == null)
// throw new ShardingCoreException($"cant found InMemoryAsyncStreamMergeEngine method [{methodName}]");
// var @params = async ? new object[] { cancellationToken } : new object[0];
// startNew1.Stop();
// var x = startNew1.ElapsedMilliseconds;
// Console.WriteLine("----------------------"+x);
//}
{
var queryEntityType = query.GetQueryEntityType();
var newStreamMergeEngineType = streamMergeEngineType.MakeGenericType(shardingDbContext.GetType(), queryEntityType);
var streamEngine = Activator.CreateInstance(newStreamMergeEngineType, query, shardingDbContext);
var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult); var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult);
var streamEngineMethod = newStreamMergeEngineType.GetMethod(methodName); var streamEngineMethod = newStreamMergeEngineType.GetMethod(methodName);
if (streamEngineMethod == null) if (streamEngineMethod == null)
@ -178,13 +142,13 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
var @params = async ? new object[] { cancellationToken } : new object[0]; var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, @params); return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, @params);
} }
} private TResult GenericMergeExecute2<TResult>(Type streamMergeEngineType, IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken)
private TResult GenericMergeExecute2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{ {
var queryEntityType = query.GetQueryEntityType(); var queryEntityType = mergeQueryCompilerContext.GetQueryEntityType();
var resultType = query.GetResultType(); var streamMergeContext = GetStreamMergeContext(mergeQueryCompilerContext);
var resultType = (mergeQueryCompilerContext.GetQueryExpression() as MethodCallExpression).GetResultType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType, resultType); streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType, resultType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); var streamEngine = Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult); var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult);
var streamEngineMethod = streamMergeEngineType.GetMethod(methodName); var streamEngineMethod = streamMergeEngineType.GetMethod(methodName);
if (streamEngineMethod == null) if (streamEngineMethod == null)
@ -196,10 +160,12 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
} }
private TResult EnsureMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) private TResult EnsureMergeExecute<TResult>(Type streamMergeEngineType, IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken)
{ {
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType()); var queryEntityType = mergeQueryCompilerContext.GetQueryEntityType();
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); var streamMergeContext = GetStreamMergeContext(mergeQueryCompilerContext);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
var methodName = async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult); var methodName = async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult);
var streamEngineMethod = streamMergeEngineType.GetMethod(methodName); var streamEngineMethod = streamMergeEngineType.GetMethod(methodName);
if (streamEngineMethod == null) if (streamEngineMethod == null)
@ -208,13 +174,15 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
return (TResult)streamEngineMethod.Invoke(streamEngine, @params); return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
} }
private TResult EnsureMergeExecute2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) private TResult EnsureMergeExecute2<TResult>(Type streamMergeEngineType, IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken)
{ {
var queryEntityType = mergeQueryCompilerContext.GetQueryEntityType();
if (async) if (async)
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0]); streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType, typeof(TResult).GetGenericArguments()[0]);
else else
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult)); streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType, typeof(TResult));
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); var streamMergeContext = GetStreamMergeContext(mergeQueryCompilerContext);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
var methodName = async var methodName = async
? nameof(IEnsureMergeResult<object>.MergeResultAsync) ? nameof(IEnsureMergeResult<object>.MergeResultAsync)
: nameof(IEnsureMergeResult<object>.MergeResult); : nameof(IEnsureMergeResult<object>.MergeResult);
@ -224,14 +192,16 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
var @params = async ? new object[] { cancellationToken } : new object[0]; var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.Invoke(streamEngine, @params); return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
} }
private TResult EnsureMergeExecute3<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) private TResult EnsureMergeExecute3<TResult>(Type streamMergeEngineType, IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken)
{ {
var resultType = query.GetResultType(); var queryEntityType = mergeQueryCompilerContext.GetQueryEntityType();
var resultType = (mergeQueryCompilerContext.GetQueryExpression() as MethodCallExpression).GetResultType();
if (async) if (async)
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0], resultType); streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType, typeof(TResult).GetGenericArguments()[0], resultType);
else else
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult), resultType); streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType, typeof(TResult), resultType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); var streamMergeContext = GetStreamMergeContext(mergeQueryCompilerContext);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
var methodName = async var methodName = async
? nameof(IEnsureMergeResult<object>.MergeResultAsync) ? nameof(IEnsureMergeResult<object>.MergeResultAsync)
: nameof(IEnsureMergeResult<object>.MergeResult); : nameof(IEnsureMergeResult<object>.MergeResult);

View File

@ -0,0 +1,156 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.ShardingExecutors
{
public class MergeQueryCompilerContext : IMergeQueryCompilerContext
{
private readonly IParallelTableManager _parallelTableManager;
private readonly IQueryCompilerContext _queryCompilerContext;
private readonly QueryCombineResult _queryCombineResult;
private readonly Type _queryEntityType;
private readonly DataSourceRouteResult _dataSourceRouteResult;
private readonly bool _isEnumerableQuery;
private readonly IEnumerable<TableRouteResult> _tableRouteResults;
/// <summary>
/// 本次查询跨库
/// </summary>
public readonly bool _isCrossDataSource;
/// <summary>
/// 本次查询跨表
/// </summary>
public readonly bool _isCrossTable;
private QueryCompilerExecutor _queryCompilerExecutor;
private bool? hasQueryCompilerExecutor;
private MergeQueryCompilerContext(IQueryCompilerContext queryCompilerContext, QueryCombineResult queryCombineResult,Type queryEntityType, DataSourceRouteResult dataSourceRouteResult, IEnumerable<TableRouteResult> tableRouteResults,bool isEnumerableQuery)
{
_queryCompilerContext = queryCompilerContext;
_queryCombineResult = queryCombineResult;
_queryEntityType = queryEntityType;
_parallelTableManager = (IParallelTableManager)ShardingContainer.GetService(typeof(IParallelTableManager<>).GetGenericType0(queryCompilerContext.GetShardingDbContextType()));
_dataSourceRouteResult = dataSourceRouteResult;
_isEnumerableQuery = isEnumerableQuery;
_tableRouteResults = GetTableRouteResults(tableRouteResults);
_isCrossDataSource = dataSourceRouteResult.IntersectDataSources.Count > 1;
_isCrossTable = _tableRouteResults.Count() > 1;
}
private IEnumerable<TableRouteResult> GetTableRouteResults(IEnumerable<TableRouteResult> tableRouteResults)
{
if (_queryCompilerContext.GetQueryEntities().Count > 1)
{
var entityMetadataManager = _queryCompilerContext.GetEntityMetadataManager();
var queryShardingTables = _queryCompilerContext.GetQueryEntities().Where(o => entityMetadataManager.IsShardingTable(o)).ToArray();
if (queryShardingTables.Length > 1 && _parallelTableManager.IsParallelTableQuery(queryShardingTables))
{
return tableRouteResults.Where(o => o.ReplaceTables.Select(p => p.Tail).ToHashSet().Count == 1);
}
}
return tableRouteResults;
}
public static MergeQueryCompilerContext Create(IQueryCompilerContext queryCompilerContext, QueryCombineResult queryCombineResult, Type queryEntityType, DataSourceRouteResult dataSourceRouteResult,IEnumerable<TableRouteResult> tableRouteResults,bool isEnumerableQuery)
{
return new MergeQueryCompilerContext(queryCompilerContext, queryCombineResult, queryEntityType,dataSourceRouteResult, tableRouteResults, isEnumerableQuery);
}
public ISet<Type> GetQueryEntities()
{
return _queryCompilerContext.GetQueryEntities();
}
public IShardingDbContext GetShardingDbContext()
{
return _queryCompilerContext.GetShardingDbContext();
}
public Expression GetQueryExpression()
{
return _queryCompilerContext.GetQueryExpression();
}
public IEntityMetadataManager GetEntityMetadataManager()
{
return _queryCompilerContext.GetEntityMetadataManager();
}
public Type GetShardingDbContextType()
{
return _queryCompilerContext.GetShardingDbContextType();
}
public QueryCompilerExecutor GetQueryCompilerExecutor()
{
if (!hasQueryCompilerExecutor.HasValue)
{
if (!IsMergeQuery())
{
var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
var dbContext = GetShardingDbContext().GetDbContext(_dataSourceRouteResult.IntersectDataSources.First(), false, routeTailFactory.Create(_tableRouteResults.First()));
_queryCompilerExecutor = new QueryCompilerExecutor(dbContext, GetQueryExpression());
}
}
return _queryCompilerExecutor;
}
public QueryCombineResult GetQueryCombineResult()
{
return _queryCombineResult;
}
public Type GetQueryEntityType()
{
return _queryEntityType;
}
public IEnumerable<TableRouteResult> GetTableRouteResults()
{
return _tableRouteResults;
}
public DataSourceRouteResult GetDataSourceRouteResult()
{
return _dataSourceRouteResult;
}
public bool IsMergeQuery()
{
return _isCrossDataSource || _isCrossTable;
}
public bool IsCrossTable()
{
return _isCrossTable;
}
public bool IsCrossDataSource()
{
return _isCrossDataSource;
}
public bool IsEnumerableQuery()
{
return _isEnumerableQuery;
}
}
}

View File

@ -12,26 +12,28 @@ using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Extensions; using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
using ShardingCore.Utils; using ShardingCore.Utils;
namespace ShardingCore.Sharding.ShardingExecutors namespace ShardingCore.Sharding.ShardingExecutors
{ {
public class QueryCompilerContext public class QueryCompilerContext: IQueryCompilerContext
{ {
public ISet<Type> QueryEntities { get; } private readonly ISet<Type> _queryEntities;
public IShardingDbContext ShardingDbContext { get; } private readonly IShardingDbContext _shardingDbContext;
public Expression QueryExpression { get; } private readonly Expression _queryExpression;
public IEntityMetadataManager EntityMetadataManager { get; } private readonly IEntityMetadataManager _entityMetadataManager;
public Type ShardingDbContextType { get; } private readonly Type _shardingDbContextType;
private DbContext realDbContext; private QueryCompilerExecutor _queryCompilerExecutor;
private bool? hasQueryCompilerExecutor;
private QueryCompilerContext( IShardingDbContext shardingDbContext, Expression queryExpression) private QueryCompilerContext( IShardingDbContext shardingDbContext, Expression queryExpression)
{ {
ShardingDbContextType = shardingDbContext.GetType(); _shardingDbContextType = shardingDbContext.GetType();
QueryEntities = ShardingUtil.GetQueryEntitiesByExpression(queryExpression, ShardingDbContextType); _queryEntities = ShardingUtil.GetQueryEntitiesByExpression(queryExpression, _shardingDbContextType);
ShardingDbContext = shardingDbContext; _shardingDbContext = shardingDbContext;
QueryExpression = queryExpression; _queryExpression = queryExpression;
EntityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(ShardingDbContextType)); _entityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(_shardingDbContextType));
} }
public static QueryCompilerContext Create(IShardingDbContext shardingDbContext, Expression queryExpression) public static QueryCompilerContext Create(IShardingDbContext shardingDbContext, Expression queryExpression)
@ -39,23 +41,46 @@ namespace ShardingCore.Sharding.ShardingExecutors
return new QueryCompilerContext(shardingDbContext, queryExpression); return new QueryCompilerContext(shardingDbContext, queryExpression);
} }
public IQueryCompiler GetQueryCompiler() public ISet<Type> GetQueryEntities()
{ {
if (QueryEntities.All(o => !EntityMetadataManager.IsSharding(o))) return _queryEntities;
}
public IShardingDbContext GetShardingDbContext()
{
return _shardingDbContext;
}
public Expression GetQueryExpression()
{
return _queryExpression;
}
public IEntityMetadataManager GetEntityMetadataManager()
{
return _entityMetadataManager;
}
public Type GetShardingDbContextType()
{
return _shardingDbContextType;
}
public QueryCompilerExecutor GetQueryCompilerExecutor()
{
if (!hasQueryCompilerExecutor.HasValue)
{
if (_queryEntities.All(o => !_entityMetadataManager.IsSharding(o)))
{ {
var virtualDataSource = (IVirtualDataSource)ShardingContainer.GetService( var virtualDataSource = (IVirtualDataSource)ShardingContainer.GetService(
typeof(IVirtualDataSource<>).GetGenericType0(ShardingDbContextType)); typeof(IVirtualDataSource<>).GetGenericType0(_shardingDbContextType));
var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>(); var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
realDbContext = ShardingDbContext.GetDbContext(virtualDataSource.DefaultDataSourceName, false, routeTailFactory.Create(string.Empty)); var dbContext = _shardingDbContext.GetDbContext(virtualDataSource.DefaultDataSourceName, false, routeTailFactory.Create(string.Empty));
return realDbContext.GetService<IQueryCompiler>(); _queryCompilerExecutor = new QueryCompilerExecutor(dbContext, _queryExpression);
}
} }
return null; return _queryCompilerExecutor;
}
public Expression NewQueryExpression()
{
return QueryExpression.ReplaceDbContextExpression(realDbContext);
} }
} }
} }

View File

@ -0,0 +1,122 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.ShardingExecutors
{
public class QueryCompilerContextFactory: IQueryCompilerContextFactory
{
private static readonly IQueryableCombine _enumeratorQueryableCombine;
private static readonly IQueryableCombine _allQueryableCombine;
private static readonly IQueryableCombine _constantQueryableCombine;
private static readonly IQueryableCombine _selectQueryableCombine;
private static readonly IQueryableCombine _whereQueryableCombine;
static QueryCompilerContextFactory()
{
_enumeratorQueryableCombine = new EnumeratorQueryableCombine();
_allQueryableCombine = new AllQueryableCombine();
_constantQueryableCombine = new ConstantQueryableCombine();
_selectQueryableCombine = new SelectQueryableCombine();
_whereQueryableCombine = new WhereQueryableCombine();
}
public IQueryCompilerContext Create<TResult>(IShardingDbContext shardingDbContext, Expression queryExpression,bool async)
{
IQueryCompilerContext queryCompilerContext =
QueryCompilerContext.Create(shardingDbContext, queryExpression);
if (queryCompilerContext.GetQueryCompilerExecutor() is not null)
{
return queryCompilerContext;
}
var (queryEntityType, queryableCombine, isEnumerableQuery) = GetQueryableCombine<TResult>(queryCompilerContext, async);
var dataSourceRouteRuleEngineFactory = (IDataSourceRouteRuleEngineFactory)ShardingContainer.GetService(typeof(IDataSourceRouteRuleEngineFactory<>).GetGenericType0(queryCompilerContext.GetShardingDbContextType()));
var tableRouteRuleEngineFactory = (ITableRouteRuleEngineFactory)ShardingContainer.GetService(typeof(ITableRouteRuleEngineFactory<>).GetGenericType0(queryCompilerContext.GetShardingDbContextType()));
var queryCombineResult = queryableCombine.Combine(queryCompilerContext, queryEntityType);
var dataSourceRouteResult = dataSourceRouteRuleEngineFactory.Route(queryCombineResult.GetCombineQueryable());
var tableRouteResults = tableRouteRuleEngineFactory.Route(queryCombineResult.GetCombineQueryable());
var mergeCombineCompilerContext = MergeQueryCompilerContext.Create(queryCompilerContext, queryCombineResult, queryEntityType, dataSourceRouteResult,
tableRouteResults, isEnumerableQuery);
return mergeCombineCompilerContext;
}
private (Type queryEntityType, IQueryableCombine queryableCombine,bool isEnumerableQuery) GetQueryableCombine<TResult>(IQueryCompilerContext queryCompilerContext, bool async)
{
var isEnumerableQuery = IsEnumerableQuery<TResult>(queryCompilerContext,async);
if (isEnumerableQuery)
{
var queryEntityType = GetEnumerableQueryEntityType<TResult>(queryCompilerContext,async);
return (queryEntityType, _enumeratorQueryableCombine, isEnumerableQuery);
}
else
{
var queryEntityType = (queryCompilerContext.GetQueryExpression() as MethodCallExpression)
.GetQueryEntityType();
var queryableCombine = GetMethodQueryableCombine(queryCompilerContext);
return (queryEntityType, queryableCombine,isEnumerableQuery);
}
}
private bool IsEnumerableQuery<TResult>(IQueryCompilerContext queryCompilerContext,bool async)
{
return !async && queryCompilerContext.GetQueryExpression().Type
.HasImplementedRawGeneric(typeof(IQueryable<>))
||
async && typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>));
}
private Type GetEnumerableQueryEntityType<TResult>(IQueryCompilerContext queryCompilerContext, bool async)
{
Type queryEntityType;
if (async)
queryEntityType = typeof(TResult).GetGenericArguments()[0];
else
{
queryEntityType = queryCompilerContext.GetQueryExpression().Type.GetSequenceType();
}
return queryEntityType;
}
private IQueryableCombine GetMethodQueryableCombine(IQueryCompilerContext queryCompilerContext)
{
if (queryCompilerContext.GetQueryExpression() is MethodCallExpression methodCallExpression)
{
switch (methodCallExpression.Method.Name)
{
case nameof(Queryable.First):
case nameof(Queryable.FirstOrDefault):
case nameof(Queryable.Last):
case nameof(Queryable.LastOrDefault):
case nameof(Queryable.Single):
case nameof(Queryable.SingleOrDefault):
case nameof(Queryable.Count):
case nameof(Queryable.LongCount):
case nameof(Queryable.Any):
return _whereQueryableCombine;
case nameof(Queryable.All):
return _allQueryableCombine;
case nameof(Queryable.Max):
case nameof(Queryable.Min):
case nameof(Queryable.Sum):
case nameof(Queryable.Average):
return _selectQueryableCombine;
case nameof(Queryable.Contains):
return _constantQueryableCombine;
}
}
throw new ShardingCoreException($"query expression:[{queryCompilerContext.GetQueryExpression().ShardingPrint()}] is not terminate operate");
}
}
}

View File

@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Query.Internal;
using ShardingCore.Extensions;
namespace ShardingCore.Sharding.ShardingExecutors
{
public class QueryCompilerExecutor
{
private readonly IQueryCompiler _queryCompiler;
private readonly Expression _queryExpression;
public QueryCompilerExecutor(DbContext dbContext,Expression queryExpression)
{
_queryCompiler = dbContext.GetService<IQueryCompiler>();
_queryExpression = queryExpression.ReplaceDbContextExpression(dbContext);
}
public IQueryCompiler GetQueryCompiler()
{
return _queryCompiler;
}
public Expression GetReplaceQueryExpression()
{
return _queryExpression;
}
}
}

View File

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.QueryableCombines
{
public class AllQueryCombineResult: QueryCombineResult
{
private readonly LambdaExpression _allPredicate;
public AllQueryCombineResult(LambdaExpression allPredicate,IQueryable queryable,IQueryCompilerContext queryCompilerContext) : base(queryable, queryCompilerContext)
{
_allPredicate = allPredicate;
}
public LambdaExpression GetAllPredicate()
{
return _allPredicate;
}
}
}

View File

@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.QueryableCombines
{
public class AllQueryableCombine : AbstractQueryableCombine
{
public override IQueryable DoCombineQueryable(IQueryable queryable, Expression secondExpression, IQueryCompilerContext queryCompilerContext)
{
return queryable;
}
public override QueryCombineResult GetDefaultQueryCombineResult(IQueryable queryable, Expression secondExpression, IQueryCompilerContext queryCompilerContext)
{
LambdaExpression allPredicate = null;
if (secondExpression is UnaryExpression where && where.Operand is LambdaExpression lambdaExpression)
{
allPredicate = lambdaExpression;
}
return new AllQueryCombineResult(allPredicate,queryable, queryCompilerContext);
}
}
}

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.QueryableCombines
{
public class ConstantQueryCombineResult : QueryCombineResult
{
private readonly object _constantItem;
public ConstantQueryCombineResult(object constantItem, IQueryable queryable, IQueryCompilerContext queryCompilerContext) : base(queryable, queryCompilerContext)
{
_constantItem = constantItem;
}
public object GetConstantItem()
{
return _constantItem;
}
}
}

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.QueryableCombines
{
public class ConstantQueryableCombine:AbstractQueryableCombine
{
public override IQueryable DoCombineQueryable(IQueryable queryable, Expression secondExpression, IQueryCompilerContext queryCompilerContext)
{
if (!(secondExpression is ConstantExpression))
{
throw new ShardingCoreInvalidOperationException(queryCompilerContext.GetQueryExpression().ShardingPrint());
}
return queryable;
}
public override QueryCombineResult GetDefaultQueryCombineResult(IQueryable queryable, Expression secondExpression, IQueryCompilerContext queryCompilerContext)
{
if (!(secondExpression is ConstantExpression constantExpression))
{
throw new ShardingCoreException($"not found constant {queryCompilerContext.GetQueryExpression().ShardingPrint()}");
}
var constantItem = constantExpression.Value;
return new ConstantQueryCombineResult(constantItem, queryable, queryCompilerContext);
}
}
}

View File

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.QueryableCombines
{
public class EnumeratorQueryableCombine : IQueryableCombine
{
public QueryCombineResult Combine(IQueryCompilerContext queryCompilerContext,Type queryEntityType)
{
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = (IQueryable)Activator.CreateInstance(type, queryCompilerContext.GetQueryExpression());
return new QueryCombineResult(queryable, queryCompilerContext);
}
}
}

View File

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.QueryableCombines
{
public class QueryCombineResult
{
private readonly IQueryable _queryable;
private readonly IQueryCompilerContext _queryCompilerContext;
public QueryCombineResult(IQueryable queryable,IQueryCompilerContext queryCompilerContext)
{
_queryable = queryable;
_queryCompilerContext = queryCompilerContext;
}
public IQueryable GetCombineQueryable()
{
return _queryable;
}
public IQueryCompilerContext GetQueryCompilerContext()
{
return _queryCompilerContext;
}
}
}

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.QueryableCombines
{
public class SelectQueryCombineResult:QueryCombineResult
{
private readonly Expression _secondExpression;
public SelectQueryCombineResult(Expression secondExpression,IQueryable queryable, IQueryCompilerContext queryCompilerContext) : base(queryable, queryCompilerContext)
{
_secondExpression = secondExpression;
}
public IQueryable GetSelectCombineQueryable<TEntity, TSelect>(IQueryable<TEntity> queryable)
{
if (_secondExpression != null)
{
if (_secondExpression is UnaryExpression unaryExpression && unaryExpression.Operand is LambdaExpression lambdaExpression && lambdaExpression is Expression<Func<TEntity, TSelect>> selector)
{
return queryable.Select(selector);
}
throw new ShardingCoreException($"expression is not selector:{GetQueryCompilerContext().GetQueryExpression().ShardingPrint()}");
}
return queryable;
}
}
}

View File

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.QueryableCombines
{
public class SelectQueryableCombine:AbstractQueryableCombine
{
public override IQueryable DoCombineQueryable(IQueryable queryable, Expression secondExpression,
IQueryCompilerContext queryCompilerContext)
{
return queryable;
}
public override QueryCombineResult GetDefaultQueryCombineResult(IQueryable queryable, Expression secondExpression,
IQueryCompilerContext queryCompilerContext)
{
return new SelectQueryCombineResult(secondExpression, queryable, queryCompilerContext);
}
}
}

View File

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.QueryableCombines
{
public class WhereQueryableCombine: AbstractQueryableCombine
{
public override IQueryable DoCombineQueryable(IQueryable queryable, Expression secondExpression, IQueryCompilerContext queryCompilerContext)
{
if (secondExpression is UnaryExpression where && where.Operand is LambdaExpression lambdaExpression )
{
MethodCallExpression whereCallExpression = Expression.Call(
typeof(Queryable),
nameof(Queryable.Where),
new Type[] { queryable.ElementType },
queryable.Expression,lambdaExpression
);
return queryable.Provider.CreateQuery(whereCallExpression);
}
throw new ShardingCoreInvalidOperationException(queryCompilerContext.GetQueryExpression().ShardingPrint());
}
}
}

View File

@ -1,25 +1,22 @@
using System;
using System.Collections.Concurrent;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.Internal.StreamMerge.ReWrite; using ShardingCore.Core.Internal.StreamMerge.ReWrite;
using ShardingCore.Core.Internal.Visitors; using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.Internal.Visitors.GroupBys; using ShardingCore.Core.Internal.Visitors.GroupBys;
using ShardingCore.Core.Internal.Visitors.Selects; using ShardingCore.Core.Internal.Visitors.Selects;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using ShardingCore.Core;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
namespace ShardingCore.Sharding namespace ShardingCore.Sharding
@ -35,6 +32,8 @@ namespace ShardingCore.Sharding
, IAsyncDisposable , IAsyncDisposable
#endif #endif
{ {
public IMergeQueryCompilerContext MergeQueryCompilerContext { get; }
//private readonly IShardingScopeFactory _shardingScopeFactory; //private readonly IShardingScopeFactory _shardingScopeFactory;
private readonly IQueryable<TEntity> _source; private readonly IQueryable<TEntity> _source;
private readonly IShardingDbContext _shardingDbContext; private readonly IShardingDbContext _shardingDbContext;
@ -74,28 +73,21 @@ namespace ShardingCore.Sharding
private readonly ConcurrentDictionary<DbContext, object> _parallelDbContexts; private readonly ConcurrentDictionary<DbContext, object> _parallelDbContexts;
private readonly IShardingComparer _shardingComparer; private readonly IShardingComparer _shardingComparer;
private readonly IParallelTableManager _parallelTableManager;
private readonly IEntityMetadataManager _entityMetadataManager;
public StreamMergeContext(IQueryable<TEntity> source, IShardingDbContext shardingDbContext, public StreamMergeContext(IMergeQueryCompilerContext mergeQueryCompilerContext,
DataSourceRouteResult dataSourceRouteResult,
IEnumerable<TableRouteResult> tableRouteResults,
IRouteTailFactory routeTailFactory) IRouteTailFactory routeTailFactory)
{ {
QueryEntities = source.ParseQueryableEntities(shardingDbContext.GetType()); MergeQueryCompilerContext = mergeQueryCompilerContext;
QueryEntities = mergeQueryCompilerContext.GetQueryEntities();
//_shardingScopeFactory = shardingScopeFactory; //_shardingScopeFactory = shardingScopeFactory;
_source = source; _source = (IQueryable<TEntity>)mergeQueryCompilerContext.GetQueryCombineResult().GetCombineQueryable();
_shardingDbContext = shardingDbContext; _shardingDbContext = mergeQueryCompilerContext.GetShardingDbContext();
_routeTailFactory = routeTailFactory; _routeTailFactory = routeTailFactory;
DataSourceRouteResult = dataSourceRouteResult; DataSourceRouteResult = mergeQueryCompilerContext.GetDataSourceRouteResult();
_parallelTableManager = (IParallelTableManager)ShardingContainer.GetService(typeof(IParallelTableManager<>).GetGenericType0(shardingDbContext.GetType())); TableRouteResults = mergeQueryCompilerContext.GetTableRouteResults();
_entityMetadataManager = IsCrossDataSource = mergeQueryCompilerContext.IsCrossDataSource();
(IEntityMetadataManager)ShardingContainer.GetService( IsCrossTable = mergeQueryCompilerContext.IsCrossTable();
typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContext.GetType())); var reWriteResult = new ReWriteEngine<TEntity>(_source).ReWrite();
TableRouteResults = GetTableRouteResults(tableRouteResults);
IsCrossDataSource = dataSourceRouteResult.IntersectDataSources.Count > 1;
IsCrossTable = TableRouteResults.Count() > 1;
var reWriteResult = new ReWriteEngine<TEntity>(source).ReWrite();
Skip = reWriteResult.Skip; Skip = reWriteResult.Skip;
Take = reWriteResult.Take; Take = reWriteResult.Take;
Orders = reWriteResult.Orders ?? Enumerable.Empty<PropertyOrder>(); Orders = reWriteResult.Orders ?? Enumerable.Empty<PropertyOrder>();
@ -105,27 +97,14 @@ namespace ShardingCore.Sharding
_reWriteSource = reWriteResult.ReWriteQueryable; _reWriteSource = reWriteResult.ReWriteQueryable;
_trackerManager = _trackerManager =
(ITrackerManager)ShardingContainer.GetService( (ITrackerManager)ShardingContainer.GetService(
typeof(ITrackerManager<>).GetGenericType0(shardingDbContext.GetType())); typeof(ITrackerManager<>).GetGenericType0(mergeQueryCompilerContext.GetShardingDbContextType()));
_shardingComparer = (IShardingComparer)ShardingContainer.GetService(typeof(IShardingComparer<>).GetGenericType0(_shardingDbContext.GetType())); _shardingComparer = (IShardingComparer)ShardingContainer.GetService(typeof(IShardingComparer<>).GetGenericType0(_shardingDbContext.GetType()));
_shardingConfigOption = ShardingContainer.GetServices<IShardingConfigOption>() _shardingConfigOption = ShardingContainer.GetServices<IShardingConfigOption>()
.FirstOrDefault(o => o.ShardingDbContextType == shardingDbContext.GetType()); .FirstOrDefault(o => o.ShardingDbContextType == mergeQueryCompilerContext.GetShardingDbContextType());
_parallelDbContexts = new ConcurrentDictionary<DbContext, object>(); _parallelDbContexts = new ConcurrentDictionary<DbContext, object>();
//RouteResults = _tableTableRouteRuleEngineFactory.Route(_shardingDbContext.ShardingDbContextType, _source); //RouteResults = _tableTableRouteRuleEngineFactory.Route(_shardingDbContext.ShardingDbContextType, _source);
} }
private IEnumerable<TableRouteResult> GetTableRouteResults(IEnumerable<TableRouteResult> tableRouteResults)
{
if (QueryEntities.Count > 1)
{
var queryShardingTables = QueryEntities.Where(o => _entityMetadataManager.IsShardingTable(o)).ToArray();
if (queryShardingTables.Length > 1 && _parallelTableManager.IsParallelTableQuery(queryShardingTables))
{
return tableRouteResults.Where(o => o.ReplaceTables.Select(p => p.Tail).ToHashSet().Count == 1);
}
}
return tableRouteResults;
}
//public StreamMergeContext(IQueryable<T> source,IEnumerable<TableRouteResult> routeResults, //public StreamMergeContext(IQueryable<T> source,IEnumerable<TableRouteResult> routeResults,
// IShardingParallelDbContextFactory shardingParallelDbContextFactory,IShardingScopeFactory shardingScopeFactory) // IShardingParallelDbContextFactory shardingParallelDbContextFactory,IShardingScopeFactory shardingScopeFactory)
//{ //{

View File

@ -5,6 +5,7 @@ using System.Linq;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine; using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding namespace ShardingCore.Sharding
{ {
@ -16,22 +17,15 @@ namespace ShardingCore.Sharding
*/ */
public class StreamMergeContextFactory<TShardingDbContext> : IStreamMergeContextFactory<TShardingDbContext> where TShardingDbContext:DbContext,IShardingDbContext public class StreamMergeContextFactory<TShardingDbContext> : IStreamMergeContextFactory<TShardingDbContext> where TShardingDbContext:DbContext,IShardingDbContext
{ {
private readonly IDataSourceRouteRuleEngineFactory<TShardingDbContext> _dataSourceRouteRuleEngineFactory;
private readonly ITableRouteRuleEngineFactory<TShardingDbContext> _tableRouteRuleEngineFactory;
private readonly IRouteTailFactory _routeTailFactory; private readonly IRouteTailFactory _routeTailFactory;
public StreamMergeContextFactory(IDataSourceRouteRuleEngineFactory<TShardingDbContext> dataSourceRouteRuleEngineFactory, public StreamMergeContextFactory(IRouteTailFactory routeTailFactory)
ITableRouteRuleEngineFactory<TShardingDbContext> tableRouteRuleEngineFactory,IRouteTailFactory routeTailFactory)
{ {
_dataSourceRouteRuleEngineFactory = dataSourceRouteRuleEngineFactory;
_tableRouteRuleEngineFactory = tableRouteRuleEngineFactory;
_routeTailFactory = routeTailFactory; _routeTailFactory = routeTailFactory;
} }
public StreamMergeContext<T> Create<T>(IQueryable<T> queryable,IShardingDbContext shardingDbContext) public StreamMergeContext<T> Create<T>(IMergeQueryCompilerContext mergeQueryCompilerContext)
{ {
var dataSourceRouteResult = _dataSourceRouteRuleEngineFactory.Route(queryable); return new StreamMergeContext<T>(mergeQueryCompilerContext, _routeTailFactory);
var tableRouteResults = _tableRouteRuleEngineFactory.Route(queryable);
return new StreamMergeContext<T>(queryable,shardingDbContext, dataSourceRouteResult, tableRouteResults, _routeTailFactory);
} }
} }
} }

View File

@ -828,8 +828,8 @@ namespace ShardingCore.Test
} }
catch (Exception e) catch (Exception e)
{ {
Assert.Equal(typeof(InvalidOperationException),e.InnerException.GetType()); Assert.Equal(typeof(InvalidOperationException),e.GetType());
Assert.True(e.InnerException.Message.Contains("contains")); Assert.True(e.Message.Contains("contains"));
} }
} }
} }
@ -879,8 +879,8 @@ namespace ShardingCore.Test
} }
catch (Exception e) catch (Exception e)
{ {
Assert.Equal(typeof(InvalidOperationException), e.InnerException.GetType()); Assert.Equal(typeof(InvalidOperationException), e.GetType());
Assert.True(e.InnerException.Message.Contains("contains")); Assert.True(e.Message.Contains("contains"));
} }
} }
} }

View File

@ -825,8 +825,8 @@ namespace ShardingCore.Test2x
} }
catch (Exception e) catch (Exception e)
{ {
Assert.Equal(typeof(InvalidOperationException),e.InnerException.GetType()); Assert.Equal(typeof(InvalidOperationException),e.GetType());
Assert.True(e.InnerException.Message.Contains("contains")); Assert.True(e.Message.Contains("contains"));
} }
} }
} }
@ -876,8 +876,8 @@ namespace ShardingCore.Test2x
} }
catch (Exception e) catch (Exception e)
{ {
Assert.Equal(typeof(InvalidOperationException), e.InnerException.GetType()); Assert.Equal(typeof(InvalidOperationException), e.GetType());
Assert.True(e.InnerException.Message.Contains("contains")); Assert.True(e.Message.Contains("contains"));
} }
} }
} }

View File

@ -827,8 +827,8 @@ namespace ShardingCore.Test3x
} }
catch (Exception e) catch (Exception e)
{ {
Assert.Equal(typeof(InvalidOperationException),e.InnerException.GetType()); Assert.Equal(typeof(InvalidOperationException),e.GetType());
Assert.True(e.InnerException.Message.Contains("contains")); Assert.True(e.Message.Contains("contains"));
} }
} }
} }
@ -878,8 +878,8 @@ namespace ShardingCore.Test3x
} }
catch (Exception e) catch (Exception e)
{ {
Assert.Equal(typeof(InvalidOperationException), e.InnerException.GetType()); Assert.Equal(typeof(InvalidOperationException), e.GetType());
Assert.True(e.InnerException.Message.Contains("contains")); Assert.True(e.Message.Contains("contains"));
} }
} }
} }

View File

@ -826,8 +826,8 @@ namespace ShardingCore.Test5x
} }
catch (Exception e) catch (Exception e)
{ {
Assert.Equal(typeof(InvalidOperationException),e.InnerException.GetType()); Assert.Equal(typeof(InvalidOperationException),e.GetType());
Assert.True(e.InnerException.Message.Contains("contains")); Assert.True(e.Message.Contains("contains"));
} }
} }
} }
@ -877,8 +877,8 @@ namespace ShardingCore.Test5x
} }
catch (Exception e) catch (Exception e)
{ {
Assert.Equal(typeof(InvalidOperationException), e.InnerException.GetType()); Assert.Equal(typeof(InvalidOperationException), e.GetType());
Assert.True(e.InnerException.Message.Contains("contains")); Assert.True(e.Message.Contains("contains"));
} }
} }
} }