From bba0bb928464a43eda9c6ef866028ca8305f553f Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Sun, 3 Jul 2022 22:26:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E9=A6=96=E6=AC=A1=E5=B0=86fi?= =?UTF-8?q?rst=20or=20default=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Controllers/WeatherForecastController.cs | 2 + samples/Sample.MySql/Startup.cs | 4 +- .../Extensions/IShardingQueryableExtension.cs | 8 - .../Sharding/Abstractions/IMergeContext.cs | 13 ++ .../OrderStreamMergeAsyncEnumerator.cs | 1 - .../MergeContexts/QueryableParseEngine.cs | 4 +- .../MergeContexts/QueryableRewriteEngine.cs | 29 +++- .../AppendOrderSequenceStreamEnumerable.cs | 2 +- ...stOrDefaultSkipAsyncInMemoryMergeEngine.cs | 143 +++++++++-------- .../FirstSkipAsyncInMemoryMergeEngine.cs | 151 +++++++++--------- .../Sharding/ReWrite/ReWriteEngine.cs | 99 ------------ .../Sharding/ReWrite/ReWriteResult.cs | 37 ----- .../IMergeQueryCompilerContext.cs | 4 +- .../Abstractions/IQueryCompilerContext.cs | 1 + .../DefaultShardingQueryExecutor.cs | 7 +- .../MergeQueryCompilerContext.cs | 29 ++-- .../ShardingExecutors/QueryCompilerContext.cs | 32 +++- .../Sharding/StreamMergeContext.cs | 4 +- .../Visitors/QueryableExtraDiscoverVisitor.cs | 12 +- .../TableExists/MySqlTableEnsureManager.cs | 2 +- .../SqlServerTableEnsureManager.cs | 5 +- 21 files changed, 262 insertions(+), 327 deletions(-) create mode 100644 src/ShardingCore/Sharding/Abstractions/IMergeContext.cs delete mode 100644 src/ShardingCore/Sharding/ReWrite/ReWriteEngine.cs delete mode 100644 src/ShardingCore/Sharding/ReWrite/ReWriteResult.cs diff --git a/samples/Sample.MySql/Controllers/WeatherForecastController.cs b/samples/Sample.MySql/Controllers/WeatherForecastController.cs index 2a0449f1..9bc095c3 100644 --- a/samples/Sample.MySql/Controllers/WeatherForecastController.cs +++ b/samples/Sample.MySql/Controllers/WeatherForecastController.cs @@ -26,6 +26,8 @@ namespace Sample.MySql.Controllers [HttpGet] public async Task Get() { + var resultX = await _defaultTableDbContext.Set().Where(o => o.Id == "2" || o.Id == "3").FirstOrDefaultAsync(); + var resultY = await _defaultTableDbContext.Set().FirstOrDefaultAsync(o => o.Id == "2" || o.Id == "3"); var result = await _defaultTableDbContext.Set().AnyAsync(); var result1 = await _defaultTableDbContext.Set().Where(o => o.Id == "2" || o.Id == "3").ToListAsync(); var result2 = await _defaultTableDbContext.Set().Skip(1).Take(10).ToListAsync(); diff --git a/samples/Sample.MySql/Startup.cs b/samples/Sample.MySql/Startup.cs index 2d1f9d69..b5f5f5a6 100644 --- a/samples/Sample.MySql/Startup.cs +++ b/samples/Sample.MySql/Startup.cs @@ -7,6 +7,8 @@ using ShardingCore.Bootstrappers; using ShardingCore.Core; using ShardingCore.Core.RuntimeContexts; using ShardingCore.Extensions; +using ShardingCore.TableExists; +using ShardingCore.TableExists.Abstractions; namespace Sample.MySql { @@ -59,7 +61,7 @@ namespace Sample.MySql builder.UseMySql(connection, new MySqlServerVersion(new Version())).UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking).UseLoggerFactory(efLogger); }); o.AddDefaultDataSource("ds0", "server=127.0.0.1;port=3306;database=dbdbd0;userid=root;password=root;"); - }) + }).ReplaceService(ServiceLifetime.Singleton) .Build(sp); stopwatch.Stop(); Console.WriteLine("ShardingRuntimeContext build:"+stopwatch.ElapsedMilliseconds); diff --git a/src/ShardingCore/Extensions/IShardingQueryableExtension.cs b/src/ShardingCore/Extensions/IShardingQueryableExtension.cs index 0f1db321..0717f5b5 100644 --- a/src/ShardingCore/Extensions/IShardingQueryableExtension.cs +++ b/src/ShardingCore/Extensions/IShardingQueryableExtension.cs @@ -27,14 +27,6 @@ namespace ShardingCore.Extensions private static readonly MethodInfo QueryableTakeMethod = typeof(Queryable).GetMethods().First( m => m.Name == nameof(Queryable.Take) && m.GetParameters().Length == 2 && m.GetParameters()[1].ParameterType == typeof(int)); - internal static ExtraEntry GetExtraEntry(this IQueryable source) - { - var extraVisitor = new QueryableExtraDiscoverVisitor(); - extraVisitor.Visit(source.Expression); - var extraEntry = new ExtraEntry(extraVisitor.GetPaginationContext().Skip, extraVisitor.GetPaginationContext().Take, extraVisitor.GetOrderByContext().PropertyOrders,extraVisitor.GetSelectContext(),extraVisitor.GetGroupByContext()); - extraEntry.ProcessGroupBySelectProperties(); - return extraEntry; - } /// /// 删除Skip表达式 /// diff --git a/src/ShardingCore/Sharding/Abstractions/IMergeContext.cs b/src/ShardingCore/Sharding/Abstractions/IMergeContext.cs new file mode 100644 index 00000000..904013b2 --- /dev/null +++ b/src/ShardingCore/Sharding/Abstractions/IMergeContext.cs @@ -0,0 +1,13 @@ +using System.Linq; + +namespace ShardingCore.Sharding.Abstractions +{ + + public interface IMergeContext + { + IQueryable GetCombineQueryable(); + IQueryable GetRewriteQueryable(); + int? GetSkip(); + int? GetTake(); + } +} diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs index 3e56fab8..b2a542a3 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs @@ -4,7 +4,6 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; -using ShardingCore.Core.Internal.StreamMerge; using ShardingCore.Extensions; namespace ShardingCore.Sharding.Enumerators diff --git a/src/ShardingCore/Sharding/MergeContexts/QueryableParseEngine.cs b/src/ShardingCore/Sharding/MergeContexts/QueryableParseEngine.cs index 29ac898d..5cdc04b3 100644 --- a/src/ShardingCore/Sharding/MergeContexts/QueryableParseEngine.cs +++ b/src/ShardingCore/Sharding/MergeContexts/QueryableParseEngine.cs @@ -16,9 +16,9 @@ namespace ShardingCore.Sharding.MergeContexts public IParseResult Parse(IMergeQueryCompilerContext mergeQueryCompilerContext) { var isEnumerableQuery = mergeQueryCompilerContext.IsEnumerableQuery(); - string queryMethodName = isEnumerableQuery ? null : mergeQueryCompilerContext.QueryMethodName(); + string queryMethodName = mergeQueryCompilerContext.GetQueryMethodName(); var combineQueryable = mergeQueryCompilerContext.GetQueryCombineResult().GetCombineQueryable(); - var queryableExtraDiscoverVisitor = new QueryableExtraDiscoverVisitor(); + var queryableExtraDiscoverVisitor = new QueryableExtraDiscoverVisitor(mergeQueryCompilerContext); queryableExtraDiscoverVisitor.Visit(combineQueryable.Expression); return new ParseResult(queryableExtraDiscoverVisitor.GetPaginationContext(), queryableExtraDiscoverVisitor.GetOrderByContext(), queryableExtraDiscoverVisitor.GetSelectContext(), diff --git a/src/ShardingCore/Sharding/MergeContexts/QueryableRewriteEngine.cs b/src/ShardingCore/Sharding/MergeContexts/QueryableRewriteEngine.cs index edc66193..322a906b 100644 --- a/src/ShardingCore/Sharding/MergeContexts/QueryableRewriteEngine.cs +++ b/src/ShardingCore/Sharding/MergeContexts/QueryableRewriteEngine.cs @@ -21,11 +21,12 @@ namespace ShardingCore.Sharding.MergeContexts public sealed class QueryableRewriteEngine : IQueryableRewriteEngine { private static readonly ISet singleEntityMethodNames = new HashSet(); + private static readonly ISet supportSingleEntityMethodNames = new HashSet(); static QueryableRewriteEngine() { - singleEntityMethodNames.Add(nameof(Enumerable.First)); - singleEntityMethodNames.Add(nameof(Enumerable.FirstOrDefault)); + supportSingleEntityMethodNames.Add(nameof(Enumerable.First)); + supportSingleEntityMethodNames.Add(nameof(Enumerable.FirstOrDefault)); singleEntityMethodNames.Add(nameof(Enumerable.Last)); singleEntityMethodNames.Add(nameof(Enumerable.LastOrDefault)); singleEntityMethodNames.Add(nameof(Enumerable.Single)); @@ -46,7 +47,7 @@ namespace ShardingCore.Sharding.MergeContexts { if (!mergeQueryCompilerContext.IsEnumerableQuery()) { - var queryMethodName = mergeQueryCompilerContext.QueryMethodName(); + var queryMethodName = mergeQueryCompilerContext.GetQueryMethodName(); if (singleEntityMethodNames.Contains(queryMethodName)) { //todo 修复做兼容 @@ -69,17 +70,33 @@ namespace ShardingCore.Sharding.MergeContexts reWriteQueryable = reWriteQueryable.RemoveSkip(); } - if (take.HasValue) + //如果是first or default + var fixedTake = mergeQueryCompilerContext.GetFixedTake(); + if (fixedTake.HasValue) { if (skip.HasValue) { - reWriteQueryable = reWriteQueryable.ReSkip(0).ReTake(take.Value + skip.GetValueOrDefault()); + reWriteQueryable = reWriteQueryable.ReSkip(0).ReTake(fixedTake.Value + skip.GetValueOrDefault()); } else { - reWriteQueryable = reWriteQueryable.ReTake(take.Value + skip.GetValueOrDefault()); + reWriteQueryable = reWriteQueryable.ReTake(fixedTake.Value); } } + else + { + if (take.HasValue) + { + if (skip.HasValue) + { + reWriteQueryable = reWriteQueryable.ReSkip(0).ReTake(take.Value + skip.GetValueOrDefault()); + } + else + { + reWriteQueryable = reWriteQueryable.ReTake(take.Value + skip.GetValueOrDefault()); + } + } + } //包含group by if (groupByContext.GroupExpression != null) { diff --git a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/Enumerables/AppendOrderSequenceStreamEnumerable.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/Enumerables/AppendOrderSequenceStreamEnumerable.cs index f5f0269f..dc6ac36b 100644 --- a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/Enumerables/AppendOrderSequenceStreamEnumerable.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/Enumerables/AppendOrderSequenceStreamEnumerable.cs @@ -107,7 +107,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer var sequenceResults = new SequencePaginationList(sortRouteResults.Select(o => o.RouteQueryResult)).Skip(skip).Take(take).ToList(); - GetStreamMergeContext().ReSetOrders(reSetOrders); + GetStreamMergeContext().ReSetOrders(reSetOrders.ToArray()); return sequenceResults.Select(sequenceResult => new SqlSequenceRouteUnit(sequenceResult)); } diff --git a/src/ShardingCore/Sharding/MergeEngines/FirstOrDefaultSkipAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/FirstOrDefaultSkipAsyncInMemoryMergeEngine.cs index d085c1dd..30759738 100644 --- a/src/ShardingCore/Sharding/MergeEngines/FirstOrDefaultSkipAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/FirstOrDefaultSkipAsyncInMemoryMergeEngine.cs @@ -1,68 +1,75 @@ -// using System.Collections.Generic; -// using System.Linq; -// using System.Threading; -// using System.Threading.Tasks; -// using ShardingCore.Extensions; -// using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge; -// using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines; -// -// namespace ShardingCore.Sharding.MergeEngines -// { -// /* -// * @Author: xjm -// * @Description: -// * @Date: 2021/8/17 15:16:36 -// * @Ver: 1.0 -// * @Email: 326308290@qq.com -// */ -// internal class FirstOrDefaultSkipAsyncInMemoryMergeEngine : IEnsureMergeResult -// { -// private readonly StreamMergeContext _streamMergeContext; -// -// public FirstOrDefaultSkipAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) -// { -// _streamMergeContext = streamMergeContext; -// } -// // protected override IExecutor> CreateExecutor0(bool async) -// // { -// // return new FirstOrDefaultMethodExecutor(GetStreamMergeContext()); -// // } -// // -// // protected override TEntity DoMergeResult0(List> resultList) -// // { -// // var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList(); -// // -// // if (notNullResult.IsEmpty()) -// // return default; -// // -// // var streamMergeContext = GetStreamMergeContext(); -// // if (streamMergeContext.Orders.Any()) -// // return notNullResult.AsQueryable().OrderWithExpression(streamMergeContext.Orders, streamMergeContext.GetShardingComparer()).FirstOrDefault(); -// // -// // return notNullResult.FirstOrDefault(); -// // } -// public TEntity MergeResult() -// { -// return MergeResultAsync().WaitAndUnwrapException(false); -// } -// -// public async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) -// { -// -// //将toke改成1 -// var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine(_streamMergeContext); -// -// var list = new List(); -// await foreach (var element in asyncEnumeratorStreamMergeEngine.WithCancellation(cancellationToken)) -// { -// list.Add(element); -// } -// -// if (list.IsEmpty()) -// { -// return default; -// } -// return list.FirstOrDefault(); -// } -// } -// } +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using ShardingCore.Extensions; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge; +using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines; + +namespace ShardingCore.Sharding.MergeEngines +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/8/17 15:16:36 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + internal class FirstOrDefaultSkipAsyncInMemoryMergeEngine : IEnsureMergeResult + { + private readonly StreamMergeContext _streamMergeContext; + + public FirstOrDefaultSkipAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) + { + _streamMergeContext = streamMergeContext; + } + + // protected override IExecutor> CreateExecutor0(bool async) + // { + // return new FirstOrDefaultMethodExecutor(GetStreamMergeContext()); + // } + // + // protected override TEntity DoMergeResult0(List> resultList) + // { + // var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList(); + // + // if (notNullResult.IsEmpty()) + // return default; + // + // var streamMergeContext = GetStreamMergeContext(); + // if (streamMergeContext.Orders.Any()) + // return notNullResult.AsQueryable().OrderWithExpression(streamMergeContext.Orders, streamMergeContext.GetShardingComparer()).FirstOrDefault(); + // + // return notNullResult.FirstOrDefault(); + // } + public TEntity MergeResult() + { + return MergeResultAsync().WaitAndUnwrapException(false); + } + + public async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + { + //将toke改成1 + var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine(_streamMergeContext); + +#if EFCORE2 + var list = await asyncEnumeratorStreamMergeEngine.ToList(cancellationToken); +#endif +#if !EFCORE2 + var take = _streamMergeContext.GetTake(); + var list = new List(take??31); + await foreach (var element in asyncEnumeratorStreamMergeEngine.WithCancellation(cancellationToken)) + { + list.Add(element); + } +#endif + + if (list.IsEmpty()) + { + return default; + } + + return list.FirstOrDefault(); + } + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/MergeEngines/FirstSkipAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/FirstSkipAsyncInMemoryMergeEngine.cs index 09b53ccf..fcb8a581 100644 --- a/src/ShardingCore/Sharding/MergeEngines/FirstSkipAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/FirstSkipAsyncInMemoryMergeEngine.cs @@ -1,73 +1,78 @@ -// using System; -// using System.Collections.Generic; -// using System.Linq; -// using System.Threading; -// using System.Threading.Tasks; -// using ShardingCore.Extensions; -// using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge; -// using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines; -// -// namespace ShardingCore.Sharding.MergeEngines -// { -// -// public class FirstSkipAsyncInMemoryMergeEngine : IEnsureMergeResult -// { -// private readonly StreamMergeContext _streamMergeContext; -// -// public FirstSkipAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) -// { -// _streamMergeContext = streamMergeContext; -// } -// // protected override IExecutor> CreateExecutor0(bool async) -// // { -// // return new FirstOrDefaultMethodExecutor(GetStreamMergeContext()); -// // } -// // -// // protected override TEntity DoMergeResult0(List> resultList) -// // { -// // var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList(); -// // -// // if (notNullResult.IsEmpty()) -// // return default; -// // -// // var streamMergeContext = GetStreamMergeContext(); -// // if (streamMergeContext.Orders.Any()) -// // return notNullResult.AsQueryable().OrderWithExpression(streamMergeContext.Orders, streamMergeContext.GetShardingComparer()).FirstOrDefault(); -// // -// // return notNullResult.FirstOrDefault(); -// // } -// public TEntity MergeResult() -// { -// return MergeResultAsync().WaitAndUnwrapException(false); -// } -// -// public async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) -// { -// -// //将toke改成1 -// var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine(_streamMergeContext); -// -// var list = new List(); -// await foreach (var element in asyncEnumeratorStreamMergeEngine.WithCancellation(cancellationToken)) -// { -// list.Add(element); -// } -// -// if (list.IsEmpty()) -// throw new InvalidOperationException("Sequence contains no elements."); -// -// return list.First(); -// } -// -// -// // if (notNullResult.IsEmpty()) -// // throw new InvalidOperationException("Sequence contains no elements."); -// // -// // var streamMergeContext = GetStreamMergeContext(); -// // if (streamMergeContext.Orders.Any()) -// // return notNullResult.AsQueryable().OrderWithExpression(streamMergeContext.Orders, streamMergeContext.GetShardingComparer()).First(); -// // -// // return notNullResult.First(); -// } -// } -// +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using ShardingCore.Extensions; +using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge; +using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines; + +namespace ShardingCore.Sharding.MergeEngines +{ + + public class FirstSkipAsyncInMemoryMergeEngine : IEnsureMergeResult + { + private readonly StreamMergeContext _streamMergeContext; + + public FirstSkipAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) + { + _streamMergeContext = streamMergeContext; + } + // protected override IExecutor> CreateExecutor0(bool async) + // { + // return new FirstOrDefaultMethodExecutor(GetStreamMergeContext()); + // } + // + // protected override TEntity DoMergeResult0(List> resultList) + // { + // var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList(); + // + // if (notNullResult.IsEmpty()) + // return default; + // + // var streamMergeContext = GetStreamMergeContext(); + // if (streamMergeContext.Orders.Any()) + // return notNullResult.AsQueryable().OrderWithExpression(streamMergeContext.Orders, streamMergeContext.GetShardingComparer()).FirstOrDefault(); + // + // return notNullResult.FirstOrDefault(); + // } + public TEntity MergeResult() + { + return MergeResultAsync().WaitAndUnwrapException(false); + } + + public async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + { + + //将toke改成1 + var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine(_streamMergeContext); + +#if EFCORE2 + var list = await asyncEnumeratorStreamMergeEngine.ToList(cancellationToken); +#endif +#if !EFCORE2 + var take = _streamMergeContext.GetTake(); + var list = new List(take??31); + await foreach (var element in asyncEnumeratorStreamMergeEngine.WithCancellation(cancellationToken)) + { + list.Add(element); + } +#endif + if (list.IsEmpty()) + throw new InvalidOperationException("Sequence contains no elements."); + + return list.First(); + } + + + // if (notNullResult.IsEmpty()) + // throw new InvalidOperationException("Sequence contains no elements."); + // + // var streamMergeContext = GetStreamMergeContext(); + // if (streamMergeContext.Orders.Any()) + // return notNullResult.AsQueryable().OrderWithExpression(streamMergeContext.Orders, streamMergeContext.GetShardingComparer()).First(); + // + // return notNullResult.First(); + } +} + diff --git a/src/ShardingCore/Sharding/ReWrite/ReWriteEngine.cs b/src/ShardingCore/Sharding/ReWrite/ReWriteEngine.cs deleted file mode 100644 index 477648c1..00000000 --- a/src/ShardingCore/Sharding/ReWrite/ReWriteEngine.cs +++ /dev/null @@ -1,99 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using Microsoft.Extensions.DependencyInjection; -using ShardingCore.Core.EntityMetadatas; -using ShardingCore.Core.Internal.Visitors; -using ShardingCore.Exceptions; -using ShardingCore.Extensions; -using ShardingCore.Extensions.InternalExtensions; -using ShardingCore.Sharding.MergeContexts; -using ShardingCore.Sharding.Visitors.Selects; - -namespace ShardingCore.Core.Internal.StreamMerge.ReWrite -{ -/* -* @Author: xjm -* @Description: -* @Date: Thursday, 28 January 2021 23:44:24 -* @Email: 326308290@qq.com -*/ - internal class ReWriteEngine - { - private readonly IQueryable _queryable; - - public ReWriteEngine(IQueryable queryable) - { - _queryable = queryable; - } - - public ReWriteResult ReWrite() - { - var extraEntry = _queryable.GetExtraEntry(); - var skip = extraEntry.Skip; - var take = extraEntry.Take; - var orders = extraEntry.Orders ?? Enumerable.Empty(); - - //去除分页,获取前Take+Skip数量 - var reWriteQueryable = _queryable; - if (take.HasValue) - { - reWriteQueryable = reWriteQueryable.RemoveTake().As>(); - } - - if (skip.HasValue) - { - reWriteQueryable = reWriteQueryable.RemoveSkip().As>(); - } - - if (take.HasValue) - { - if (skip.HasValue) - { - reWriteQueryable = reWriteQueryable.Skip(0).Take(take.Value + skip.GetValueOrDefault()); - } - else - { - reWriteQueryable = reWriteQueryable.Take(take.Value + skip.GetValueOrDefault()); - } - } - //包含group by - if (extraEntry.GroupByContext.GroupExpression != null) - { - if (orders.IsEmpty()) - { - //将查询的属性转换成order by - var selectProperties = extraEntry.SelectContext.SelectProperties.Where(o => !(o is SelectAggregateProperty)); - if (selectProperties.IsNotEmpty()) - { - var sort = string.Join(",",selectProperties.Select(o=>$"{o.PropertyName} asc")); - reWriteQueryable = reWriteQueryable.OrderWithExpression(sort,null); - var reWriteOrders = new List(selectProperties.Count()); - foreach (var orderProperty in selectProperties) - { - reWriteOrders.Add(new PropertyOrder(orderProperty.PropertyName,true, orderProperty.OwnerType)); - } - orders = reWriteOrders; - } - } - else - { - //将查询的属性转换成order by 并且order和select的未聚合查询必须一致 - var selectProperties = extraEntry.SelectContext.SelectProperties.Where(o => !(o is SelectAggregateProperty)); - - if (orders.Count() != selectProperties.Count()) - throw new ShardingCoreInvalidOperationException("group by query order items not equal select un-aggregate items"); - var os=orders.Select(o => o.PropertyExpression).ToList(); - var ss = selectProperties.Select(o => o.PropertyName).ToList(); - for (int i = 0; i < os.Count(); i++) - { - if(!os[i].Equals(ss[i])) - throw new ShardingCoreInvalidOperationException($"group by query order items not equal select un-aggregate items: order:[{os[i]}],select:[{ss[i]}"); - } - } - } - return new ReWriteResult(_queryable,reWriteQueryable,skip,take,orders,extraEntry.SelectContext,extraEntry.GroupByContext); - } - - } -} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/ReWrite/ReWriteResult.cs b/src/ShardingCore/Sharding/ReWrite/ReWriteResult.cs deleted file mode 100644 index e96df6a2..00000000 --- a/src/ShardingCore/Sharding/ReWrite/ReWriteResult.cs +++ /dev/null @@ -1,37 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using ShardingCore.Core.Internal.Visitors; -using ShardingCore.Core.Internal.Visitors.Selects; -using ShardingCore.Sharding.MergeContexts; - -namespace ShardingCore.Core.Internal.StreamMerge.ReWrite -{ -/* -* @Author: xjm -* @Description: -* @Date: Thursday, 28 January 2021 23:45:29 -* @Email: 326308290@qq.com -*/ - internal class ReWriteResult - { - public ReWriteResult(IQueryable originalQueryable, IQueryable reWriteQueryable, int? skip, int? take, IEnumerable orders, SelectContext selectContext, GroupByContext groupByContext) - { - OriginalQueryable = originalQueryable; - ReWriteQueryable = reWriteQueryable; - Skip = skip; - Take = take; - Orders = orders; - SelectContext = selectContext; - GroupByContext = groupByContext; - } - - public IQueryable OriginalQueryable { get; } - public IQueryable ReWriteQueryable { get; } - public int? Skip { get; } - public int? Take { get; } - public IEnumerable Orders { get; } - public SelectContext SelectContext { get; } - public GroupByContext GroupByContext { get; } - } -} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/ShardingExecutors/Abstractions/IMergeQueryCompilerContext.cs b/src/ShardingCore/Sharding/ShardingExecutors/Abstractions/IMergeQueryCompilerContext.cs index ca2f5de8..3c272b85 100644 --- a/src/ShardingCore/Sharding/ShardingExecutors/Abstractions/IMergeQueryCompilerContext.cs +++ b/src/ShardingCore/Sharding/ShardingExecutors/Abstractions/IMergeQueryCompilerContext.cs @@ -18,8 +18,6 @@ namespace ShardingCore.Sharding.ShardingExecutors.Abstractions bool IsCrossTable(); bool IsCrossDataSource(); - - string QueryMethodName(); - //bool IsEnumerableQuery(); + int? GetFixedTake(); } } diff --git a/src/ShardingCore/Sharding/ShardingExecutors/Abstractions/IQueryCompilerContext.cs b/src/ShardingCore/Sharding/ShardingExecutors/Abstractions/IQueryCompilerContext.cs index 857ee198..5117d98c 100644 --- a/src/ShardingCore/Sharding/ShardingExecutors/Abstractions/IQueryCompilerContext.cs +++ b/src/ShardingCore/Sharding/ShardingExecutors/Abstractions/IQueryCompilerContext.cs @@ -30,6 +30,7 @@ namespace ShardingCore.Sharding.ShardingExecutors.Abstractions /// QueryCompilerExecutor GetQueryCompilerExecutor(); bool IsEnumerableQuery(); + string GetQueryMethodName(); /// /// 当前是否读写分离走读库(包括是否启用读写分离和是否当前的dbcontext启用了读库查询) diff --git a/src/ShardingCore/Sharding/ShardingExecutors/DefaultShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingExecutors/DefaultShardingQueryExecutor.cs index 3cefb89a..5bc0a1c9 100644 --- a/src/ShardingCore/Sharding/ShardingExecutors/DefaultShardingQueryExecutor.cs +++ b/src/ShardingCore/Sharding/ShardingExecutors/DefaultShardingQueryExecutor.cs @@ -15,6 +15,7 @@ using Microsoft.Extensions.Logging; using ShardingCore.Core; using ShardingCore.Extensions.InternalExtensions; using ShardingCore.Logger; +using ShardingCore.Sharding.MergeEngines; #if EFCORE2 using Microsoft.EntityFrameworkCore.Internal; #endif @@ -70,13 +71,13 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors } private TResult DoExecute(IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken = new CancellationToken()) { - var queryMethodName = mergeQueryCompilerContext.QueryMethodName(); + var queryMethodName = mergeQueryCompilerContext.GetQueryMethodName(); switch (queryMethodName) { case nameof(Enumerable.First): - return EnsureResultTypeMergeExecute(typeof(FirstAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken); + return EnsureResultTypeMergeExecute(typeof(FirstSkipAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken); case nameof(Enumerable.FirstOrDefault): - return EnsureResultTypeMergeExecute(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken); + return EnsureResultTypeMergeExecute(typeof(FirstOrDefaultSkipAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken); case nameof(Enumerable.Last): return EnsureResultTypeMergeExecute(typeof(LastAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken); case nameof(Enumerable.LastOrDefault): diff --git a/src/ShardingCore/Sharding/ShardingExecutors/MergeQueryCompilerContext.cs b/src/ShardingCore/Sharding/ShardingExecutors/MergeQueryCompilerContext.cs index dfc8d0a7..43128679 100644 --- a/src/ShardingCore/Sharding/ShardingExecutors/MergeQueryCompilerContext.cs +++ b/src/ShardingCore/Sharding/ShardingExecutors/MergeQueryCompilerContext.cs @@ -49,6 +49,7 @@ namespace ShardingCore.Sharding.ShardingExecutors private QueryCompilerExecutor _queryCompilerExecutor; private bool? hasQueryCompilerExecutor; + private readonly int? _fixedTake; private MergeQueryCompilerContext(IShardingRuntimeContext shardingRuntimeContext,IQueryCompilerContext queryCompilerContext, QueryCombineResult queryCombineResult, ShardingRouteResult shardingRouteResult) { _shardingRuntimeContext = shardingRuntimeContext; @@ -60,6 +61,11 @@ namespace ShardingCore.Sharding.ShardingExecutors _isCrossDataSource = shardingRouteResult.IsCrossDataSource; _isCrossTable = shardingRouteResult.IsCrossTable; _existCrossTableTails = shardingRouteResult.ExistCrossTableTails; + var queryMethodName = queryCompilerContext.GetQueryMethodName(); + if (nameof(Enumerable.First) == queryMethodName || nameof(Enumerable.FirstOrDefault) == queryMethodName) + { + _fixedTake = 1; + } } // // private IEnumerable GetTableRouteResults(IEnumerable tableRouteResults) @@ -208,6 +214,11 @@ namespace ShardingCore.Sharding.ShardingExecutors return _queryCompilerContext.IsEnumerableQuery(); } + public string GetQueryMethodName() + { + return _queryCompilerContext.GetQueryMethodName(); + } + /// /// 如果需要聚合并且存在跨tail的查询或者本次是读链接 /// @@ -217,23 +228,9 @@ namespace ShardingCore.Sharding.ShardingExecutors return _isCrossTable || _existCrossTableTails|| _queryCompilerContext.IsParallelQuery(); } - public string QueryMethodName() + public int? GetFixedTake() { - if (IsEnumerableQuery()) - { - throw new ShardingCoreInvalidOperationException( - $"queryable:[{GetQueryExpression().ShardingPrint()}] is enumerable query cant found query method name"); - } - - if (GetQueryExpression() is MethodCallExpression methodCallExpression) - { - return methodCallExpression.Method.Name; - } - else - { - throw new ShardingCoreInvalidOperationException( - $"queryable:[{GetQueryExpression().ShardingPrint()}] not {nameof(MethodCallExpression)} cant found query method name"); - } + return _fixedTake; } } } diff --git a/src/ShardingCore/Sharding/ShardingExecutors/QueryCompilerContext.cs b/src/ShardingCore/Sharding/ShardingExecutors/QueryCompilerContext.cs index daa66130..a69e800d 100644 --- a/src/ShardingCore/Sharding/ShardingExecutors/QueryCompilerContext.cs +++ b/src/ShardingCore/Sharding/ShardingExecutors/QueryCompilerContext.cs @@ -11,6 +11,7 @@ using System.Linq; using System.Linq.Expressions; using ShardingCore.Core; using ShardingCore.Core.RuntimeContexts; +using ShardingCore.Exceptions; using ShardingCore.Sharding.Parsers; using ShardingCore.Sharding.Parsers.Abstractions; @@ -18,6 +19,7 @@ namespace ShardingCore.Sharding.ShardingExecutors { public class QueryCompilerContext: IQueryCompilerContext { + public const string ENUMERABLE = "Enumerable"; private readonly Dictionary _queryEntities; private readonly IShardingDbContext _shardingDbContext; private readonly IShardingRuntimeContext _shardingRuntimeContext; @@ -33,6 +35,7 @@ namespace ShardingCore.Sharding.ShardingExecutors private readonly ConnectionModeEnum? _connectionMode; private readonly bool? _isSequence; private readonly bool? _sameWithShardingComparer; + private readonly string _queryMethodName; private QueryCompilerContext(IPrepareParseResult prepareParseResult) { @@ -47,13 +50,32 @@ namespace ShardingCore.Sharding.ShardingExecutors _maxQueryConnectionsLimit = prepareParseResult.GetMaxQueryConnectionsLimit(); _connectionMode = prepareParseResult.GetConnectionMode(); _entityMetadataManager = _shardingRuntimeContext.GetEntityMetadataManager(); - + _queryMethodName = QueryMethodName(_queryExpression); //原生对象的原生查询如果是读写分离就需要启用并行查询 _isParallelQuery = prepareParseResult.ReadOnly().GetValueOrDefault(); _isSequence = prepareParseResult.IsSequence(); _sameWithShardingComparer = prepareParseResult.SameWithShardingComparer(); } + private string QueryMethodName(Expression queryExpression) + { + var isEnumerableQuery = queryExpression.Type + .HasImplementedRawGeneric(typeof(IQueryable<>)); + if (isEnumerableQuery) + { + return ENUMERABLE; + } + + if (queryExpression is MethodCallExpression methodCallExpression) + { + return methodCallExpression.Method.Name; + } + else + { + throw new ShardingCoreInvalidOperationException( + $"queryable:[{queryExpression.ShardingPrint()}] not {nameof(MethodCallExpression)} cant found query method name"); + } + } public static QueryCompilerContext Create(IPrepareParseResult prepareParseResult) { return new QueryCompilerContext(prepareParseResult); @@ -159,8 +181,12 @@ namespace ShardingCore.Sharding.ShardingExecutors public bool IsEnumerableQuery() { - return _queryExpression.Type - .HasImplementedRawGeneric(typeof(IQueryable<>)); + return ENUMERABLE == _queryMethodName; + } + + public string GetQueryMethodName() + { + return _queryMethodName; } } } diff --git a/src/ShardingCore/Sharding/StreamMergeContext.cs b/src/ShardingCore/Sharding/StreamMergeContext.cs index 2274ed0e..5771ac3a 100644 --- a/src/ShardingCore/Sharding/StreamMergeContext.cs +++ b/src/ShardingCore/Sharding/StreamMergeContext.cs @@ -49,7 +49,7 @@ namespace ShardingCore.Sharding public int? Skip { get; private set; } public int? Take { get; private set;} - public IEnumerable Orders { get; private set; } + public PropertyOrder[] Orders { get; private set; } public SelectContext SelectContext => ParseResult.GetSelectContext(); public GroupByContext GroupByContext => ParseResult.GetGroupByContext(); @@ -105,7 +105,7 @@ namespace ShardingCore.Sharding Take = parseResult.GetPaginationContext().Take; } - public void ReSetOrders(IEnumerable orders) + public void ReSetOrders(PropertyOrder[] orders) { Orders = orders; } diff --git a/src/ShardingCore/Sharding/Visitors/QueryableExtraDiscoverVisitor.cs b/src/ShardingCore/Sharding/Visitors/QueryableExtraDiscoverVisitor.cs index 8ee0261c..1c718fec 100644 --- a/src/ShardingCore/Sharding/Visitors/QueryableExtraDiscoverVisitor.cs +++ b/src/ShardingCore/Sharding/Visitors/QueryableExtraDiscoverVisitor.cs @@ -6,6 +6,7 @@ using ShardingCore.Core.Internal.Visitors.Selects; using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.MergeContexts; +using ShardingCore.Sharding.ShardingExecutors.Abstractions; using ShardingCore.Sharding.Visitors.Selects; namespace ShardingCore.Core.Internal.Visitors @@ -18,12 +19,16 @@ namespace ShardingCore.Core.Internal.Visitors */ internal class QueryableExtraDiscoverVisitor : ShardingExpressionVisitor { + private readonly IMergeQueryCompilerContext _mergeQueryCompilerContext; private GroupByContext _groupByContext = new GroupByContext(); private SelectContext _selectContext = new SelectContext(); private PaginationContext _paginationContext = new PaginationContext(); private OrderByContext _orderByContext = new OrderByContext(); - + public QueryableExtraDiscoverVisitor(IMergeQueryCompilerContext mergeQueryCompilerContext) + { + _mergeQueryCompilerContext = mergeQueryCompilerContext; + } public SelectContext GetSelectContext() { return _selectContext; @@ -36,6 +41,11 @@ namespace ShardingCore.Core.Internal.Visitors public PaginationContext GetPaginationContext() { + var fixedTake = _mergeQueryCompilerContext.GetFixedTake(); + if (fixedTake.HasValue) + { + _paginationContext.Take = fixedTake.Value; + } return _paginationContext; } public OrderByContext GetOrderByContext() diff --git a/src/ShardingCore/TableExists/MySqlTableEnsureManager.cs b/src/ShardingCore/TableExists/MySqlTableEnsureManager.cs index 293dc64a..050236fd 100644 --- a/src/ShardingCore/TableExists/MySqlTableEnsureManager.cs +++ b/src/ShardingCore/TableExists/MySqlTableEnsureManager.cs @@ -24,7 +24,7 @@ namespace ShardingCore.TableExists public override ISet DoGetExistTables(DbConnection connection, string dataSourceName) { var database = connection.Database; - ISet result = new HashSet(); + ISet result = new HashSet(StringComparer.OrdinalIgnoreCase); using (var dataTable = connection.GetSchema(Tables)) { for (int i = 0; i < dataTable.Rows.Count; i++) diff --git a/src/ShardingCore/TableExists/SqlServerTableEnsureManager.cs b/src/ShardingCore/TableExists/SqlServerTableEnsureManager.cs index 4d7d3f7f..469c99e7 100644 --- a/src/ShardingCore/TableExists/SqlServerTableEnsureManager.cs +++ b/src/ShardingCore/TableExists/SqlServerTableEnsureManager.cs @@ -1,4 +1,5 @@ -using Microsoft.EntityFrameworkCore; +using System; +using Microsoft.EntityFrameworkCore; using ShardingCore.Sharding.Abstractions; using ShardingCore.TableExists.Abstractions; using System.Collections.Generic; @@ -17,7 +18,7 @@ namespace ShardingCore.TableExists public override ISet DoGetExistTables(DbConnection connection, string dataSourceName) { - ISet result = new HashSet(); + ISet result = new HashSet(StringComparer.OrdinalIgnoreCase); using (var dataTable = connection.GetSchema(Tables)) { for (int i = 0; i < dataTable.Rows.Count; i++)