添加shardingcore的查询默认不分表走默认的编译结果查询,支持所有的不分表操作

This commit is contained in:
xuejiaming 2021-12-16 21:14:28 +08:00
parent 888ba0ed5a
commit 9d3b389de0
13 changed files with 144 additions and 13 deletions

View File

@ -52,5 +52,14 @@ namespace ShardingCore.Core.EntityMetadatas
return null;
return entityMetadata;
}
/// <summary>
/// 是否是分片对象(包括分表或者分库)
/// </summary>
/// <param name="entityType"></param>
/// <returns></returns>
public bool IsSharding(Type entityType)
{
return _caches.ContainsKey(entityType);
}
}
}

View File

@ -33,5 +33,11 @@ namespace ShardingCore.Core.EntityMetadatas
/// <param name="entityType"></param>
/// <returns></returns>
EntityMetadata TryGet(Type entityType);
/// <summary>
/// 是否是分片对象(包括分表或者分库)
/// </summary>
/// <param name="entityType"></param>
/// <returns></returns>
bool IsSharding(Type entityType);
}
}

View File

@ -66,9 +66,12 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.Abstractions
public abstract List<string> GetAllDataSourceNames();
public abstract bool AddDataSourceName(string dataSourceName);
public virtual void Configure(EntityMetadataDataSourceBuilder<T> builder)
{
}
/// <summary>
/// 配置分库的一些信息
/// 1.ShardingProperty 哪个字段分库
/// 2.AutoCreateDataSource 启动时是否需要创建数据源
/// </summary>
/// <param name="builder"></param>
public abstract void Configure(EntityMetadataDataSourceBuilder<T> builder);
}
}

View File

@ -65,7 +65,10 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.Abstractions
public abstract List<string> GetAllTails();
/// <summary>
/// 配置分表信息
/// 配置分表的一些信息
/// 1.ShardingProperty 哪个字段分表
/// 2.TableSeparator 分表的后缀和表名的连接符
/// 3.AutoCreateTable 启动时是否需要创建对应的分表信息
/// </summary>
/// <param name="builder"></param>
public abstract void Configure(EntityMetadataTableBuilder<T> builder);

View File

@ -6,10 +6,17 @@ using ShardingCore.Sharding.Abstractions;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Utils;
namespace ShardingCore.EFCores
@ -31,19 +38,50 @@ namespace ShardingCore.EFCores
_shardingQueryExecutor = ShardingContainer.GetService<IShardingQueryExecutor>();
}
public TResult Execute<TResult>(Expression query)
{
return _shardingQueryExecutor.Execute<TResult>(_currentContext, query);
if (_currentContext.Context is IShardingDbContext shardingDbContext)
{
var queryCompilerIfNoShardingQuery = GetQueryCompilerIfNoShardingQuery(shardingDbContext, query);
if (queryCompilerIfNoShardingQuery != null)
{
return queryCompilerIfNoShardingQuery.Execute<TResult>(query);
}
return _shardingQueryExecutor.Execute<TResult>(_currentContext, query);
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
private IQueryCompiler GetQueryCompilerIfNoShardingQuery(IShardingDbContext shardingDbContext, Expression query)
{
var queryEntities = ShardingUtil.GetQueryEntitiesByExpression(query);
var entityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContext.GetType()));
if (queryEntities.All(o => !entityMetadataManager.IsSharding(o)))
{
var virtualDataSource = (IVirtualDataSource)ShardingContainer.GetService(
typeof(IVirtualDataSource<>).GetGenericType0(shardingDbContext.GetType()));
var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
var dbContext = shardingDbContext.GetDbContext(virtualDataSource.DefaultDataSourceName, false, routeTailFactory.Create(string.Empty));
return dbContext.GetService<IQueryCompiler>();
}
return null;
}
#if !EFCORE2
public TResult ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
if (_currentContext.Context is IShardingDbContext shardingDbContext)
{
var queryCompilerIfNoShardingQuery = GetQueryCompilerIfNoShardingQuery(shardingDbContext, query);
if (queryCompilerIfNoShardingQuery != null)
{
return queryCompilerIfNoShardingQuery.ExecuteAsync<TResult>(query, cancellationToken);
}
return _shardingQueryExecutor.ExecuteAsync<TResult>(_currentContext, query, cancellationToken);
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
[ExcludeFromCodeCoverage]
@ -71,12 +109,30 @@ namespace ShardingCore.EFCores
public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(Expression query)
{
return _shardingQueryExecutor.ExecuteAsync<IAsyncEnumerable<TResult>>(_currentContext, query);
if (_currentContext.Context is IShardingDbContext shardingDbContext)
{
var queryCompilerIfNoShardingQuery = GetQueryCompilerIfNoShardingQuery(shardingDbContext, query);
if (queryCompilerIfNoShardingQuery != null)
{
return queryCompilerIfNoShardingQuery.ExecuteAsync<TResult>(query);
}
return _shardingQueryExecutor.ExecuteAsync<IAsyncEnumerable<TResult>>(_currentContext, query);
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
public Task<TResult> ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
return _shardingQueryExecutor.ExecuteAsync<Task<TResult>>(_currentContext, query, cancellationToken);
if (_currentContext.Context is IShardingDbContext shardingDbContext)
{
var queryCompilerIfNoShardingQuery = GetQueryCompilerIfNoShardingQuery(shardingDbContext, query);
if (queryCompilerIfNoShardingQuery != null)
{
return queryCompilerIfNoShardingQuery.ExecuteAsync<TResult>(query,cancellationToken);
}
return _shardingQueryExecutor.ExecuteAsync<Task<TResult>>(_currentContext, query, cancellationToken);
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
[ExcludeFromCodeCoverage]

View File

@ -67,7 +67,16 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
// }
//}
/// <summary>
/// 将查询分表分库结果按每个数据源进行分组
/// 每组大小为 启动配置的<see cref="IShardingConfigOption.MaxQueryConnectionsLimit"/>数目
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="async"></param>
/// <param name="sqlRouteUnits"></param>
/// <param name="sqlExecutorUnitExecuteAsync"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<LinkedList<TResult>>[] GetDataSourceGroupAndExecutorGroup<TResult>(bool async,IEnumerable<ISqlRouteUnit> sqlRouteUnits, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
{
var waitTaskQueue = AggregateQueryByDataSourceName(sqlRouteUnits)
@ -138,7 +147,12 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
return sqlRouteUnits.GroupBy(o => o.DataSourceName);
}
/// <summary>
/// 每个数据源下的分表结果按 maxQueryConnectionsLimit 进行组合分组每组大小 maxQueryConnectionsLimit
/// ConnectionModeEnum为用户配置或者系统自动计算,哪怕是用户指定也是按照maxQueryConnectionsLimit来进行分组。
/// </summary>
/// <param name="sqlGroups"></param>
/// <returns></returns>
protected DataSourceSqlExecutorUnit GetSqlExecutorGroups(IGrouping<string, ISqlRouteUnit> sqlGroups)
{
var streamMergeContext = GetStreamMergeContext();

View File

@ -105,7 +105,8 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
/// <exception cref="ShardingCoreInvalidOperationException"></exception>
public override void MergeParallelExecuteResult<TResult>(LinkedList<TResult> previewResults, IEnumerable<TResult> parallelResults,bool async)
{
if (previewResults.Count > 1)
var previewResultsCount = previewResults.Count;
if (previewResultsCount > 1)
{
throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} {nameof(previewResults)} has more than one element in container");
}
@ -117,7 +118,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
if (previewResults is LinkedList<IStreamMergeAsyncEnumerator<TEntity>> previewInMemoryStreamEnumeratorResults && parallelResults is IEnumerable<IStreamMergeAsyncEnumerator<TEntity>> parallelStreamEnumeratorResults)
{
var mergeAsyncEnumerators = new LinkedList<IStreamMergeAsyncEnumerator<TEntity>>();
if (previewInMemoryStreamEnumeratorResults.Count == 1)
if (previewResultsCount == 1)
{
mergeAsyncEnumerators.AddLast(previewInMemoryStreamEnumeratorResults.First());
}

View File

@ -7,8 +7,13 @@ using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Query.Internal;
using ShardingCore.Core;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
@ -16,6 +21,7 @@ using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines;
using ShardingCore.Utils;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Internal;
#endif
@ -54,6 +60,7 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
public TResult ExecuteAsync<TResult>(ICurrentDbContext currentContext, Expression query, CancellationToken cancellationToken = new CancellationToken())
{
var currentDbContext = currentContext.Context;

View File

@ -74,6 +74,14 @@ namespace ShardingCore.Utils
return visitor.GetQueryEntities();
}
public static ISet<Type> GetQueryEntitiesByExpression(Expression expression)
{
QueryEntitiesVisitor visitor = new QueryEntitiesVisitor();
visitor.Visit(expression);
return visitor.GetQueryEntities();
}
}
}

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.Abstractions;
using ShardingCore.Test.Domain.Entities;
@ -36,6 +37,11 @@ namespace ShardingCore.Test.Shardings
return true;
}
public override void Configure(EntityMetadataDataSourceBuilder<Order> builder)
{
}
public override Expression<Func<string, bool>> GetRouteToFilter(string shardingKey, ShardingOperatorEnum shardingOperator)
{

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.Abstractions;
using ShardingCore.Test2x.Domain.Entities;
@ -36,6 +37,11 @@ namespace ShardingCore.Test2x.Shardings
return true;
}
public override void Configure(EntityMetadataDataSourceBuilder<Order> builder)
{
}
public override Expression<Func<string, bool>> GetRouteToFilter(string shardingKey, ShardingOperatorEnum shardingOperator)
{

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.Abstractions;
using ShardingCore.Test3x.Domain.Entities;
@ -36,6 +37,11 @@ namespace ShardingCore.Test3x.Shardings
return true;
}
public override void Configure(EntityMetadataDataSourceBuilder<Order> builder)
{
}
public override Expression<Func<string, bool>> GetRouteToFilter(string shardingKey, ShardingOperatorEnum shardingOperator)
{

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.Abstractions;
using ShardingCore.Test5x.Domain.Entities;
@ -36,6 +37,11 @@ namespace ShardingCore.Test5x.Shardings
return true;
}
public override void Configure(EntityMetadataDataSourceBuilder<Order> builder)
{
}
public override Expression<Func<string, bool>> GetRouteToFilter(string shardingKey, ShardingOperatorEnum shardingOperator)
{