优化部分代码,1表达式值获取[#96],union的支持[#95]和降级到union all的支持[#97],并且发布x.4.0.2版本

This commit is contained in:
xuejiaming 2022-01-17 00:04:50 +08:00
parent 09df58536a
commit 2337daa68c
33 changed files with 613 additions and 45 deletions

View File

@ -1,9 +1,9 @@
:start
::定义版本
set EFCORE2=2.4.0.01
set EFCORE3=3.4.0.01
set EFCORE5=5.4.0.01
set EFCORE6=6.4.0.01
set EFCORE2=2.4.0.02
set EFCORE3=3.4.0.02
set EFCORE5=5.4.0.02
set EFCORE6=6.4.0.02
::删除所有bin与obj下的文件
@echo off

View File

@ -59,5 +59,10 @@ namespace Sample.MultiConfig.Controllers
dbContextOptionsBuilder.UseMySql(dbConnection, new MySqlServerVersion(new Version()));
return dbContextOptionsBuilder;
}
public override void UseInnerDbContextOptionBuilder(DbContextOptionsBuilder dbContextOptionsBuilder)
{
}
}
}

View File

@ -18,6 +18,16 @@
},
"applicationUrl": "http://localhost:5138",
"dotnetRunMessages": true
},
"WSL": {
"commandName": "WSL2",
"launchBrowser": true,
"launchUrl": "http://localhost:5138/weatherforecast",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"ASPNETCORE_URLS": "http://localhost:5138"
},
"distributionName": ""
}
}
}

View File

@ -132,8 +132,20 @@ namespace Sample.SqlServer.Controllers
//await _defaultTableDbContext.SaveChangesAsync();
var sresultx1121222 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").MaxAsync(o => o.Age);
var unionUserIds = await _defaultTableDbContext.Set<SysUserMod>().Select(o=>new UnionUserId(){UserId = o.Id})
.Union(_defaultTableDbContext.Set<SysUserSalary>().Select(o => new UnionUserId() { UserId = o.UserId })).ToListAsync();
var unionUserIdCounts = await _defaultTableDbContext.Set<SysUserMod>().Select(o=>new UnionUserId(){UserId = o.Id})
.Union(_defaultTableDbContext.Set<SysUserSalary>().Select(o => new UnionUserId() { UserId = o.UserId })).CountAsync();
var hashSet = unionUserIds.Select(o=>o.UserId).ToHashSet();
var hashSetCount = hashSet.Count;
return Ok();
}
public class UnionUserId
{
public string UserId { get; set; }
}
[HttpGet]
public async Task<IActionResult> Get1([FromQuery] int p, [FromQuery] int s)
{

View File

@ -0,0 +1,84 @@
using System.Linq.Expressions;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Query;
using Microsoft.EntityFrameworkCore.Query.Internal;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore;
using ShardingCore.Core.NotSupportShardingProviders.Abstractions;
namespace Sample.SqlServer
{
public class NotSupportShardingCompiler : QueryCompiler
{
private readonly IQueryContextFactory _queryContextFactory;
private readonly IDatabase _database;
private readonly IDiagnosticsLogger<DbLoggerCategory.Query> _logger;
private readonly IModel _model;
public NotSupportShardingCompiler(IQueryContextFactory queryContextFactory, ICompiledQueryCache compiledQueryCache, ICompiledQueryCacheKeyGenerator compiledQueryCacheKeyGenerator, IDatabase database, IDiagnosticsLogger<DbLoggerCategory.Query> logger, ICurrentDbContext currentContext, IEvaluatableExpressionFilter evaluatableExpressionFilter, IModel model) : base(queryContextFactory, compiledQueryCache, compiledQueryCacheKeyGenerator, database, logger, currentContext, evaluatableExpressionFilter, model)
{
_queryContextFactory = queryContextFactory;
_database = database;
_logger = logger;
_model = model;
}
public override TResult Execute<TResult>(Expression query)
{
var notSupportManager = ShardingContainer.GetService<INotSupportManager>();
if (notSupportManager?.Current != null)
{
return NotSupportShardingExecute<TResult>(query);
}
return base.Execute<TResult>(query);
}
/// <summary>
/// use no compiler
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="query"></param>
/// <returns></returns>
private TResult NotSupportShardingExecute<TResult>(Expression query)
{
var queryContext = _queryContextFactory.Create();
query = ExtractParameters(query, queryContext, _logger);
var compiledQuery
= CompileQueryCore<TResult>(_database, query, _model, false);
return compiledQuery(queryContext);
}
public override TResult ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken = new CancellationToken())
{
var notSupportManager = ShardingContainer.GetService<INotSupportManager>();
if (notSupportManager?.Current != null)
{
var result = NotSupportShardingExecuteAsync<TResult>(query, cancellationToken);
return result;
}
return base.ExecuteAsync<TResult>(query, cancellationToken);
}
private TResult NotSupportShardingExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken = new CancellationToken())
{
var queryContext = _queryContextFactory.Create();
queryContext.CancellationToken = cancellationToken;
query = ExtractParameters(query, queryContext, _logger);
var compiledQuery
= CompileQueryCore<TResult>(_database, query, _model, true);
return compiledQuery(queryContext);
}
}
}

View File

@ -0,0 +1,76 @@
using System.Linq;
using System.Linq.Expressions;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Query;
using Microsoft.EntityFrameworkCore.Query.SqlExpressions;
using Microsoft.EntityFrameworkCore.SqlServer.Query.Internal;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore;
using ShardingCore.Core.NotSupportShardingProviders.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace Sample.SqlServer
{
public class ShardingSqlServerQuerySqlGeneratorFactory<TShardingDbContext> : IQuerySqlGeneratorFactory
where TShardingDbContext:DbContext,IShardingDbContext
{
public ShardingSqlServerQuerySqlGeneratorFactory(QuerySqlGeneratorDependencies dependencies)
{
Dependencies = dependencies;
}
public QuerySqlGeneratorDependencies Dependencies { get; }
public QuerySqlGenerator Create() => new ShardingSqlServerQuerySqlGenerator<TShardingDbContext>(Dependencies);
}
public class ShardingSqlServerQuerySqlGenerator<TShardingDbContext> : SqlServerQuerySqlGenerator
where TShardingDbContext : DbContext, IShardingDbContext
{
public ShardingSqlServerQuerySqlGenerator(QuerySqlGeneratorDependencies dependencies)
: base(dependencies)
{
}
protected override Expression VisitTable(TableExpression tableExpression)
{
return OverrideVisitTable(tableExpression);
// this._relationalCommandBuilder.Append((object) this._sqlGenerationHelper.DelimitIdentifier(tableExpression.Name, tableExpression.Schema)).Append((object) this.AliasSeparator).Append((object) this._sqlGenerationHelper.DelimitIdentifier(tableExpression.Alias));
// return (Expression) tableExpression;
// typeof(TableExpression)
// .GetFields( BindingFlags.Instance | BindingFlags.NonPublic).FirstOrDefault(o=>o.Name.Contains(nameof(tableExpression.Name)))
// .SetValue(tableExpression,"(select * from Log_1Message union all select * from Log_1Message)");
// base will append schema, table and alias
}
private Expression OverrideVisitTable(TableExpression tableExpression)
{
var supportManager = ShardingContainer.GetService<INotSupportManager>();
if (supportManager?.Current != null)
{
var tableRouteResults = supportManager?.Current.TableRoutesResults.ToArray();
if (tableRouteResults.IsNotEmpty() &&
tableRouteResults[0].ReplaceTables.Any(o => o.OriginalName == tableExpression.Name))
{
var tails = tableRouteResults.Select(o=>o.ReplaceTables.FirstOrDefault(r=>r.OriginalName==tableExpression.Name).Tail).ToHashSet();
var sqlGenerationHelper = typeof(QuerySqlGenerator).GetTypeFieldValue(this, "_sqlGenerationHelper") as ISqlGenerationHelper;
var tableManager = ShardingContainer.GetService<IVirtualTableManager<TShardingDbContext>>();
var virtualTable = tableManager.GetVirtualTable(tableExpression.Name);
var newTableName = "(" + string.Join(" union all ", tails.Select(tail => $"select * from {sqlGenerationHelper.DelimitIdentifier($"{tableExpression.Name}{virtualTable.EntityMetadata.TableSeparator}{tail}", tableExpression.Schema)}")) + ")";
var relationalCommandBuilder = typeof(QuerySqlGenerator).GetTypeFieldValue(this, "_relationalCommandBuilder") as IRelationalCommandBuilder;
relationalCommandBuilder.Append(newTableName).Append(this.AliasSeparator).Append(sqlGenerationHelper.DelimitIdentifier(tableExpression.Alias));
return tableExpression;
}
}
var result = base.VisitTable(tableExpression);
return result;
}
}
}

View File

@ -11,7 +11,12 @@ using ShardingCore.Sharding.ReadWriteConfigurations;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using Microsoft.EntityFrameworkCore.Query;
using Microsoft.EntityFrameworkCore.Query.Internal;
using Microsoft.Extensions.DependencyInjection.Extensions;
using ShardingCore.Core;
using ShardingCore.Core.NotSupportShardingProviders;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
using ShardingCore.TableExists;
namespace Sample.SqlServer
@ -37,22 +42,30 @@ namespace Sample.SqlServer
o.AddShardingTableRoute<SysUserModVirtualTableRoute>();
o.AddShardingTableRoute<SysUserSalaryVirtualTableRoute>();
o.AddShardingTableRoute<TestYearShardingVirtualTableRoute>();
o.UseInnerDbContextConfigure(builder =>
{
builder
.ReplaceService<IQuerySqlGeneratorFactory,
ShardingSqlServerQuerySqlGeneratorFactory<DefaultShardingDbContext>>()
.ReplaceService<IQueryCompiler, NotSupportShardingCompiler>();
});
})
.AddConfig(op =>
{
op.ConfigId = "c1";
op.UseShardingQuery((conStr, builder) =>
{
builder.UseSqlServer(conStr).UseLoggerFactory(efLogger);
builder.UseSqlServer(conStr).UseLoggerFactory(efLogger).ReplaceService<IQuerySqlGeneratorFactory, ShardingSqlServerQuerySqlGeneratorFactory<DefaultShardingDbContext>>();
});
op.UseShardingTransaction((connection, builder) =>
{
builder.UseSqlServer(connection).UseLoggerFactory(efLogger);
builder.UseSqlServer(connection).UseLoggerFactory(efLogger).ReplaceService<IQuerySqlGeneratorFactory, ShardingSqlServerQuerySqlGeneratorFactory<DefaultShardingDbContext>>();
});
op.ReplaceTableEnsureManager(sp => new SqlServerTableEnsureManager<DefaultShardingDbContext>());
op.AddDefaultDataSource("A",
"Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;");
}).EnsureConfig();
services.TryAddSingleton<INotSupportShardingProvider, UnionSupportShardingProvider>();
//services.AddShardingDbContext<DefaultShardingDbContext1>(
// (conn, o) =>
// o.UseSqlServer(conn).UseLoggerFactory(efLogger)
@ -117,4 +130,16 @@ namespace Sample.SqlServer
app.DbSeed();
}
}
public class UnionSupportShardingProvider : INotSupportShardingProvider
{
public void CheckNotSupportSharding(IQueryCompilerContext queryCompilerContext)
{
}
public bool IsNotSupportSharding(IQueryCompilerContext queryCompilerContext)
{
return queryCompilerContext.isUnion();
}
}
}

View File

@ -0,0 +1,9 @@
using ShardingCore.Core.CustomerDatabaseProcessers;
namespace ShardingCore.Core.NotSupportShardingProviders.Abstractions
{
public interface INotSupportAccessor
{
NotSupportContext SqlSupportContext { get; set; }
}
}

View File

@ -0,0 +1,17 @@
using System.Collections.Generic;
using ShardingCore.Core.CustomerDatabaseProcessers;
using ShardingCore.Core.CustomerDatabaseSqlSupports;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
namespace ShardingCore.Core.NotSupportShardingProviders.Abstractions
{
public interface INotSupportManager
{
NotSupportContext Current { get; }
/// <summary>
/// 创建scope
/// </summary>
/// <returns></returns>
NotSupportScope CreateScope(IEnumerable<TableRouteResult> tableRouteResults);
}
}

View File

@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Core.NotSupportShardingProviders
{
public class DefaultNotSupportShardingProvider: INotSupportShardingProvider
{
public void CheckNotSupportSharding(IQueryCompilerContext queryCompilerContext)
{
if (IsNotSupportSharding(queryCompilerContext))
throw new ShardingCoreInvalidOperationException(
$"not support sharding query :[{queryCompilerContext.GetQueryExpression().ShardingPrint()}]");
}
public bool IsNotSupportSharding(IQueryCompilerContext queryCompilerContext)
{
return queryCompilerContext.isUnion();
}
}
}

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Core.NotSupportShardingProviders
{
public interface INotSupportShardingProvider
{
void CheckNotSupportSharding(IQueryCompilerContext queryCompilerContext);
bool IsNotSupportSharding(IQueryCompilerContext queryCompilerContext);
}
}

View File

@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.CustomerDatabaseProcessers;
using ShardingCore.Core.NotSupportShardingProviders.Abstractions;
namespace ShardingCore.Core.CustomerDatabaseSqlSupports
{
public class NotSupportAccessor: INotSupportAccessor
{
private static AsyncLocal<NotSupportContext> _sqlSupportContext = new AsyncLocal<NotSupportContext>();
public NotSupportContext SqlSupportContext
{
get => _sqlSupportContext.Value;
set => _sqlSupportContext.Value = value;
}
}
}

View File

@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
namespace ShardingCore.Core.CustomerDatabaseProcessers
{
public class NotSupportContext
{
public NotSupportContext(IEnumerable<TableRouteResult> tableRoutesResults)
{
TableRoutesResults = tableRoutesResults;
}
public IEnumerable<TableRouteResult> TableRoutesResults { get; }
}
}

View File

@ -0,0 +1,24 @@
using System.Collections.Generic;
using ShardingCore.Core.CustomerDatabaseProcessers;
using ShardingCore.Core.NotSupportShardingProviders.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
namespace ShardingCore.Core.CustomerDatabaseSqlSupports
{
public class NotSupportManager: INotSupportManager
{
private readonly INotSupportAccessor _notSupportAccessor;
public NotSupportContext Current => _notSupportAccessor.SqlSupportContext;
public NotSupportManager(INotSupportAccessor notSupportAccessor)
{
_notSupportAccessor = notSupportAccessor;
}
public NotSupportScope CreateScope(IEnumerable<TableRouteResult> tableRouteResults)
{
var scope = new NotSupportScope(_notSupportAccessor);
_notSupportAccessor.SqlSupportContext = new NotSupportContext(tableRouteResults);
return scope;
}
}
}

View File

@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.NotSupportShardingProviders.Abstractions;
namespace ShardingCore.Core.CustomerDatabaseSqlSupports
{
public class NotSupportScope:IDisposable
{
public INotSupportAccessor NotSupportAccessor { get; }
/// <summary>
/// 构造函数
/// </summary>
/// <param name="notSupportAccessor"></param>
public NotSupportScope(INotSupportAccessor notSupportAccessor)
{
NotSupportAccessor = notSupportAccessor;
}
public void Dispose()
{
NotSupportAccessor.SqlSupportContext = null;
}
}
}

View File

@ -99,6 +99,7 @@ namespace ShardingCore.Core.ShardingConfigurations.Abstractions
/// DbContext如何通过连接字符串创建
/// </summary>
Action<string, DbContextOptionsBuilder> ConnectionStringConfigure { get; }
Action<DbContextOptionsBuilder> InnerDbContextConfigure { get; }
/// <summary>
/// DbContext如何通过连接字符串创建
@ -108,5 +109,7 @@ namespace ShardingCore.Core.ShardingConfigurations.Abstractions
/// DbContext如何通过现有connection创建
/// </summary>
public void UseShardingTransaction(Action<DbConnection, DbContextOptionsBuilder> transactionConfigure);
void UseInnerDbContextConfigure(Action<DbContextOptionsBuilder> innerDbContextConfigure);
}
}

View File

@ -95,6 +95,11 @@ namespace ShardingCore.Core.ShardingConfigurations
ConnectionConfigure = transactionConfigure ?? throw new ArgumentNullException(nameof(transactionConfigure));
}
public Action<DbContextOptionsBuilder> InnerDbContextConfigure { get; private set; }
public void UseInnerDbContextConfigure(Action<DbContextOptionsBuilder> innerDbContextConfigure)
{
InnerDbContextConfigure= innerDbContextConfigure?? throw new ArgumentNullException(nameof(innerDbContextConfigure));
}
public Func<IServiceProvider, IShardingComparer> ReplaceShardingComparerFactory { get; private set; } = sp => new CSharpLanguageShardingComparer();
/// <summary>
/// 替换默认的比较器

View File

@ -150,6 +150,8 @@ namespace ShardingCore.Core.ShardingConfigurations
/// </summary>
public Action<string, DbContextOptionsBuilder> ConnectionStringConfigure { get; private set; }
public Action<DbContextOptionsBuilder> InnerDbContextConfigure { get; private set; }
/// <summary>
/// DbContext如何通过连接字符串创建
@ -165,5 +167,10 @@ namespace ShardingCore.Core.ShardingConfigurations
{
ConnectionConfigure = transactionConfigure ?? throw new ArgumentNullException(nameof(transactionConfigure));
}
public void UseInnerDbContextConfigure(Action<DbContextOptionsBuilder> innerDbContextConfigure)
{
InnerDbContextConfigure = innerDbContextConfigure ?? throw new ArgumentNullException(nameof(innerDbContextConfigure));
}
}
}

View File

@ -18,7 +18,6 @@ namespace ShardingCore.Core.ShardingPage
private static AsyncLocal<ShardingPageContext> _shardingPageContext = new AsyncLocal<ShardingPageContext>();
/// <inheritdoc />
public ShardingPageContext ShardingPageContext
{
get => _shardingPageContext.Value;

View File

@ -46,6 +46,8 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions
public abstract DbContextOptionsBuilder UseDbContextOptionsBuilder(DbConnection dbConnection,
DbContextOptionsBuilder dbContextOptionsBuilder);
public abstract void UseInnerDbContextOptionBuilder(DbContextOptionsBuilder dbContextOptionsBuilder);
public virtual bool UseReadWriteSeparation()
{
return ReadWriteSeparationConfigs!=null;

View File

@ -68,6 +68,8 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions
/// <returns></returns>
DbContextOptionsBuilder UseDbContextOptionsBuilder(DbConnection dbConnection, DbContextOptionsBuilder dbContextOptionsBuilder);
void UseInnerDbContextOptionBuilder(DbContextOptionsBuilder dbContextOptionsBuilder);
bool UseReadWriteSeparation();
}

View File

@ -96,5 +96,22 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
}
return dbContextOptionsBuilder;
}
public override void UseInnerDbContextOptionBuilder(DbContextOptionsBuilder dbContextOptionsBuilder)
{
if (_options.InnerDbContextConfigure == null && _shardingEntityConfigOptions.InnerDbContextConfigure == null)
{
return;
}
if (_options.InnerDbContextConfigure != null)
{
_options.InnerDbContextConfigure.Invoke(dbContextOptionsBuilder);
}
else
{
_shardingEntityConfigOptions.InnerDbContextConfigure?.Invoke(dbContextOptionsBuilder);
}
}
}
}

View File

@ -228,6 +228,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
{
var doUseDbContextOptionsBuilder = ConfigurationParams.UseDbContextOptionsBuilder(connectionString, dbContextOptionsBuilder);
doUseDbContextOptionsBuilder.UseInnerDbContextSharding<TShardingDbContext>();
ConfigurationParams.UseInnerDbContextOptionBuilder(dbContextOptionsBuilder);
return doUseDbContextOptionsBuilder;
}
@ -236,6 +237,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
{
var doUseDbContextOptionsBuilder = ConfigurationParams.UseDbContextOptionsBuilder(dbConnection, dbContextOptionsBuilder);
doUseDbContextOptionsBuilder.UseInnerDbContextSharding<TShardingDbContext>();
ConfigurationParams.UseInnerDbContextOptionBuilder(dbContextOptionsBuilder);
return doUseDbContextOptionsBuilder;
}

View File

@ -29,6 +29,8 @@ using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingQueryExecutors;
using ShardingCore.TableCreator;
using System;
using ShardingCore.Core.CustomerDatabaseSqlSupports;
using ShardingCore.Core.NotSupportShardingProviders.Abstractions;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.Core.ShardingConfigurations;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions;
@ -131,6 +133,8 @@ namespace ShardingCore
services.TryAddSingleton<IShardingPageManager, ShardingPageManager>();
services.TryAddSingleton<IShardingPageAccessor, ShardingPageAccessor>();
services.TryAddSingleton<IShardingBootstrapper, ShardingBootstrapper>();
services.TryAddSingleton<INotSupportManager, NotSupportManager>();
services.TryAddSingleton<INotSupportAccessor, NotSupportAccessor>();
services.TryAddSingleton<IQueryTracker, QueryTracker>();
services.TryAddSingleton<IShardingTrackQueryExecutor, DefaultShardingTrackQueryExecutor>();
services.TryAddSingleton<INativeTrackQueryExecutor, NativeTrackQueryExecutor>();

View File

@ -47,29 +47,30 @@ namespace ShardingCore.Extensions
{
var entityType = obj.GetType();
PropertyInfo property;
Expression propertyAccess;
var parameter = Expression.Parameter(entityType, "o");
//Expression propertyAccess;
//var parameter = Expression.Parameter(entityType, "o");
if (propertyExpression.Contains("."))
{
String[] childProperties = propertyExpression.Split('.');
property = entityType.GetProperty(childProperties[0]);
propertyAccess = Expression.MakeMemberAccess(parameter, property);
//propertyAccess = Expression.MakeMemberAccess(parameter, property);
for (int i = 1; i < childProperties.Length; i++)
{
property = property.PropertyType.GetProperty(childProperties[i]);
propertyAccess = Expression.MakeMemberAccess(propertyAccess, property);
//propertyAccess = Expression.MakeMemberAccess(propertyAccess, property);
}
}
else
{
property = entityType.GetProperty(propertyExpression);
propertyAccess = Expression.MakeMemberAccess(parameter, property);
//propertyAccess = Expression.MakeMemberAccess(parameter, property);
}
var lambda = Expression.Lambda(propertyAccess, parameter);
Delegate fn = lambda.Compile();
return fn.DynamicInvoke(obj);
return property.GetValue(obj);
//var lambda = Expression.Lambda(propertyAccess, parameter);
//Delegate fn = lambda.Compile();
//return fn.DynamicInvoke(obj);
}
/// <summary>

View File

@ -80,6 +80,16 @@ namespace ShardingCore.Extensions
queryableTrackingDiscoverVisitor.Visit(expression);
return queryableTrackingDiscoverVisitor.IsNoTracking;
}
internal static bool GetIsUnion(this IQueryable source)
{
return GetIsUnion(source.Expression);
}
internal static bool GetIsUnion(this Expression expression)
{
var queryableUnionDiscoverVisitor = new QueryableUnionDiscoverVisitor();
queryableUnionDiscoverVisitor.Visit(expression);
return queryableUnionDiscoverVisitor.IsUnion;
}
/// <summary>
/// 切换数据源,保留原始数据源中的Expression

View File

@ -9,6 +9,8 @@ using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core;
using ShardingCore.Core.NotSupportShardingProviders;
using ShardingCore.Core.NotSupportShardingProviders.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
@ -25,7 +27,6 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
*/
internal abstract class AbstractBaseMergeEngine<TEntity>
{
protected abstract StreamMergeContext<TEntity> GetStreamMergeContext();
///// <summary>
///// 异步多线程控制并发
@ -67,6 +68,10 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
// }
//}
protected bool IsUnSupport()
{
return GetStreamMergeContext().IsUnSupportSharding();
}
/// <summary>
/// 将查询分表分库结果按每个数据源进行分组
/// 每组大小为 启动配置的<see cref="IShardingConfigOption.MaxQueryConnectionsLimit"/>数目
@ -77,7 +82,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
/// <param name="sqlExecutorUnitExecuteAsync"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<LinkedList<TResult>>[] GetDataSourceGroupAndExecutorGroup<TResult>(bool async,IEnumerable<ISqlRouteUnit> sqlRouteUnits, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
public Task<LinkedList<TResult>>[] GetDataSourceGroupAndExecutorGroup<TResult>(bool async, IEnumerable<ISqlRouteUnit> sqlRouteUnits, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
{
var waitTaskQueue = AggregateQueryByDataSourceName(sqlRouteUnits)
.Select(GetSqlExecutorGroups)
@ -85,42 +90,93 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
return Task.Run(async () =>
{
var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups;
LinkedList<TResult> result = new LinkedList<TResult>();
//同数据库下多组数据间采用串行
foreach (var executorGroup in executorGroups)
if (IsUnSupport())
{
//同组采用并行最大化用户配置链接数
var routeQueryResults = await ExecuteAsync<TResult>(executorGroup.Groups, sqlExecutorUnitExecuteAsync, cancellationToken);
//严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext
if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
var customerDatabaseSqlSupportManager = ShardingContainer.GetService<INotSupportManager>();
using (customerDatabaseSqlSupportManager.CreateScope(
((UnSupportSqlRouteUnit)dataSourceSqlExecutorUnit.SqlExecutorGroups[0].Groups[0]
.RouteUnit).TableRouteResults))
{
MergeParallelExecuteResult(result, routeQueryResults.Select(o => o.MergeResult), async);
var dbContexts = routeQueryResults.Select(o => o.DbContext);
foreach (var dbContext in dbContexts)
{
return await DoExecuteAsync(async, dataSourceSqlExecutorUnit, sqlExecutorUnitExecuteAsync, cancellationToken);
}
}
else
{
return await DoExecuteAsync(async, dataSourceSqlExecutorUnit, sqlExecutorUnitExecuteAsync, cancellationToken);
}
// var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups;
// LinkedList<TResult> result = new LinkedList<TResult>();
// //同数据库下多组数据间采用串行
// foreach (var executorGroup in executorGroups)
// {
// //同组采用并行最大化用户配置链接数
// var routeQueryResults = await ExecuteAsync<TResult>(executorGroup.Groups, sqlExecutorUnitExecuteAsync, cancellationToken);
// //严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext
// if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
// {
// MergeParallelExecuteResult(result, routeQueryResults.Select(o => o.MergeResult), async);
// var dbContexts = routeQueryResults.Select(o => o.DbContext);
// foreach (var dbContext in dbContexts)
// {
//#if !EFCORE2
// await dbContext.DisposeAsync();
//#endif
//#if EFCORE2
// dbContext.Dispose();
//#endif
// }
// }
// else
// {
// foreach (var routeQueryResult in routeQueryResults)
// {
// result.AddLast(routeQueryResult.MergeResult);
// }
// }
// }
// return result;
}, cancellationToken);
}).ToArray();
return waitTaskQueue;
}
public async Task<LinkedList<TResult>> DoExecuteAsync<TResult>(bool async,DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
{
var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups;
LinkedList<TResult> result = new LinkedList<TResult>();
//同数据库下多组数据间采用串行
foreach (var executorGroup in executorGroups)
{
//同组采用并行最大化用户配置链接数
var routeQueryResults = await ExecuteAsync<TResult>(executorGroup.Groups, sqlExecutorUnitExecuteAsync, cancellationToken);
//严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext
if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
{
MergeParallelExecuteResult(result, routeQueryResults.Select(o => o.MergeResult), async);
var dbContexts = routeQueryResults.Select(o => o.DbContext);
foreach (var dbContext in dbContexts)
{
#if !EFCORE2
await dbContext.DisposeAsync();
await dbContext.DisposeAsync();
#endif
#if EFCORE2
dbContext.Dispose();
#endif
}
}
else
{
foreach (var routeQueryResult in routeQueryResults)
{
result.AddLast(routeQueryResult.MergeResult);
}
}
}
}
else
{
foreach (var routeQueryResult in routeQueryResults)
{
result.AddLast(routeQueryResult.MergeResult);
}
}
}
return result;
}, cancellationToken);
}).ToArray();
return waitTaskQueue;
return result;
}
public virtual void MergeParallelExecuteResult<TResult>(LinkedList<TResult> previewResults, IEnumerable<TResult> parallelResults, bool async)
@ -139,8 +195,12 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
return streamMergeContext.DataSourceRouteResult.IntersectDataSources.SelectMany(
dataSourceName =>
{
if (IsUnSupport())
{
return new []{ (ISqlRouteUnit)new UnSupportSqlRouteUnit(dataSourceName, streamMergeContext.TableRouteResults) };
}
return streamMergeContext.TableRouteResults.Select(routeResult =>
new SqlRouteUnit(dataSourceName, routeResult));
(ISqlRouteUnit)new SqlRouteUnit(dataSourceName, routeResult));
});
}
protected virtual IEnumerable<IGrouping<string, ISqlRouteUnit>> AggregateQueryByDataSourceName(IEnumerable<ISqlRouteUnit> sqlRouteUnits)

View File

@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.PhysicTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.Common
{
public class UnSupportSqlRouteUnit:ISqlRouteUnit
{
public UnSupportSqlRouteUnit(string dataSourceName, IEnumerable<TableRouteResult> tableRouteResults)
{
DataSourceName = dataSourceName;
var routeResults = tableRouteResults.ToArray();
TableRouteResults = routeResults;
TableRouteResult = new TableRouteResult(new List<IPhysicTable>(0), routeResults[0].ShardingDbContextType);
}
public string DataSourceName { get; }
public TableRouteResult TableRouteResult { get; }
public IEnumerable<TableRouteResult> TableRouteResults { get; }
}
}

View File

@ -31,5 +31,7 @@ namespace ShardingCore.Sharding.ShardingExecutors.Abstractions
/// </summary>
/// <returns></returns>
bool IsQueryTrack();
bool isUnion();
}
}

View File

@ -104,6 +104,11 @@ namespace ShardingCore.Sharding.ShardingExecutors
return _queryCompilerContext.IsQueryTrack();
}
public bool isUnion()
{
return _queryCompilerContext.isUnion();
}
public QueryCompilerExecutor GetQueryCompilerExecutor()
{
if (!hasQueryCompilerExecutor.HasValue)

View File

@ -24,6 +24,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
private QueryCompilerExecutor _queryCompilerExecutor;
private bool? hasQueryCompilerExecutor;
private bool? _isNoTracking;
private bool _isUnion;
private readonly bool _isParallelQuery;
private QueryCompilerContext( IShardingDbContext shardingDbContext, Expression queryExpression)
@ -31,6 +32,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
_shardingDbContextType = shardingDbContext.GetType();
_queryEntities = ShardingUtil.GetQueryEntitiesByExpression(queryExpression, _shardingDbContextType);
_isNoTracking = queryExpression.GetIsNoTracking();
_isUnion = queryExpression.GetIsUnion();
_shardingDbContext = shardingDbContext;
_queryExpression = queryExpression;
_entityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(_shardingDbContextType));
@ -91,6 +93,11 @@ namespace ShardingCore.Sharding.ShardingExecutors
}
}
public bool isUnion()
{
return _isUnion;
}
public QueryCompilerExecutor GetQueryCompilerExecutor()
{
if (!hasQueryCompilerExecutor.HasValue)

View File

@ -19,6 +19,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.NotSupportShardingProviders;
namespace ShardingCore.Sharding
@ -34,6 +35,11 @@ namespace ShardingCore.Sharding
, IAsyncDisposable
#endif
{
private readonly INotSupportShardingProvider _notSupportShardingProvider;
private static readonly INotSupportShardingProvider _defaultNotSupportShardingProvider =
new DefaultNotSupportShardingProvider();
public IMergeQueryCompilerContext MergeQueryCompilerContext { get; }
//private readonly IShardingScopeFactory _shardingScopeFactory;
@ -96,6 +102,7 @@ namespace ShardingCore.Sharding
typeof(ITrackerManager<>).GetGenericType0(mergeQueryCompilerContext.GetShardingDbContextType()));
_shardingEntityConfigOptions = ShardingContainer.GetRequiredShardingEntityConfigOption(mergeQueryCompilerContext.GetShardingDbContextType());
_notSupportShardingProvider = ShardingContainer.GetService<INotSupportShardingProvider>() ?? _defaultNotSupportShardingProvider;
_parallelDbContexts = new ConcurrentDictionary<DbContext, object>();
}
public void ReSetOrders(IEnumerable<PropertyOrder> orders)
@ -297,6 +304,20 @@ namespace ShardingCore.Sharding
{
return _shardingEntityConfigOptions.ThrowIfQueryRouteNotMatch;
}
private bool? _isUnSupport;
public bool IsUnSupportSharding()
{
if (!_isUnSupport.HasValue)
{
_isUnSupport = _notSupportShardingProvider.IsNotSupportSharding(MergeQueryCompilerContext);
if (_isUnSupport.Value)
{
_notSupportShardingProvider.CheckNotSupportSharding(MergeQueryCompilerContext);
}
}
return _isUnSupport.Value;
}
public void Dispose()
{
foreach (var dbContext in _parallelDbContexts.Keys)

View File

@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Visitors
{
internal class QueryableUnionDiscoverVisitor:ExpressionVisitor
{
public bool IsUnion { get; private set; }
protected override Expression VisitMethodCall(MethodCallExpression node)
{
if (node.Method.Name == nameof(Queryable.Union))
{
IsUnion = true;
}
return base.VisitMethodCall(node);
}
}
}