增加对skip+first的使用提示

This commit is contained in:
xuejiaming 2022-07-03 09:23:34 +08:00
parent 2db46f1e9a
commit 2d4b9b0779
14 changed files with 157 additions and 33 deletions

View File

@ -20,6 +20,18 @@ namespace ShardingCore.Sharding.MergeContexts
{
public sealed class QueryableRewriteEngine : IQueryableRewriteEngine
{
private static readonly ISet<string> singleEntityMethodNames = new HashSet<string>();
static QueryableRewriteEngine()
{
singleEntityMethodNames.Add(nameof(Enumerable.First));
singleEntityMethodNames.Add(nameof(Enumerable.FirstOrDefault));
singleEntityMethodNames.Add(nameof(Enumerable.Last));
singleEntityMethodNames.Add(nameof(Enumerable.LastOrDefault));
singleEntityMethodNames.Add(nameof(Enumerable.Single));
singleEntityMethodNames.Add(nameof(Enumerable.SingleOrDefault));
}
public IQueryable GetRewriteQueryable(IMergeQueryCompilerContext mergeQueryCompilerContext, IParseResult parseResult)
{
var paginationContext = parseResult.GetPaginationContext();
@ -30,6 +42,20 @@ namespace ShardingCore.Sharding.MergeContexts
var take = paginationContext.Take;
var orders = orderByContext.PropertyOrders;
if (skip.HasValue && skip.Value > 0)
{
if (!mergeQueryCompilerContext.IsEnumerableQuery())
{
var queryMethodName = mergeQueryCompilerContext.QueryMethodName();
if (singleEntityMethodNames.Contains(queryMethodName))
{
//todo 修复做兼容
throw new ShardingCoreInvalidOperationException(
$"single query:[{mergeQueryCompilerContext.GetQueryExpression().ShardingPrint()}] cant use skip:{skip.Value},u should use {nameof(Enumerable.ToList)} than use skip in {nameof(IEnumerable<object>)}");
}
}
}
//去除分页,获取前Take+Skip数量
var reWriteQueryable = mergeQueryCompilerContext.GetQueryCombineResult().GetCombineQueryable();
if (take.HasValue)

View File

@ -15,8 +15,7 @@ using ShardingCore.Sharding.ShardingQueryExecutors;
*/
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
{
internal class AsyncEnumeratorStreamMergeEngine<TShardingDbContext,T> : IAsyncEnumerable<T>, IEnumerable<T>
where TShardingDbContext:DbContext,IShardingDbContext
internal class AsyncEnumeratorStreamMergeEngine<T> : IAsyncEnumerable<T>, IEnumerable<T>
{
private readonly StreamMergeContext _mergeContext;
@ -33,7 +32,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
var emptyQueryEnumerator = _mergeContext.PreperExecute(() => new EmptyQueryEnumerator<T>());
if (emptyQueryEnumerator != null)
return emptyQueryEnumerator;
var asyncEnumerator = EnumeratorStreamMergeEngineFactory<TShardingDbContext,T>.Create(_mergeContext).GetStreamEnumerable()
var asyncEnumerator = EnumeratorStreamMergeEngineFactory<T>.Create(_mergeContext).GetStreamEnumerable()
.GetAsyncEnumerator(cancellationToken);
if (_mergeContext.IsUseShardingTrack(typeof(T)))
@ -51,7 +50,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
var emptyQueryEnumerator = _mergeContext.PreperExecute(() => new EmptyQueryEnumerator<T>());
if (emptyQueryEnumerator != null)
return emptyQueryEnumerator;
var asyncEnumerator = ((IAsyncEnumerable<T>)EnumeratorStreamMergeEngineFactory<TShardingDbContext,T>.Create(_mergeContext).GetStreamEnumerable())
var asyncEnumerator = ((IAsyncEnumerable<T>)EnumeratorStreamMergeEngineFactory<T>.Create(_mergeContext).GetStreamEnumerable())
.GetEnumerator();
if (_mergeContext.IsUseShardingTrack(typeof(T)))
{
@ -67,7 +66,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
var emptyQueryEnumerator = _mergeContext.PreperExecute(() => new EmptyQueryEnumerator<T>());
if (emptyQueryEnumerator != null)
return emptyQueryEnumerator;
var enumerator = ((IEnumerable<T>)EnumeratorStreamMergeEngineFactory<TShardingDbContext,T>.Create(_mergeContext).GetStreamEnumerable())
var enumerator = ((IEnumerable<T>)EnumeratorStreamMergeEngineFactory<T>.Create(_mergeContext).GetStreamEnumerable())
.GetEnumerator();
if (_mergeContext.IsUseShardingTrack(typeof(T)))

View File

@ -26,8 +26,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class AppendOrderSequenceStreamEnumerable<TShardingDbContext, TEntity> : AbstractStreamEnumerable<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
internal class AppendOrderSequenceStreamEnumerable<TEntity> : AbstractStreamEnumerable<TEntity>
{
private readonly PaginationSequenceConfig _dataSourceSequenceOrderConfig;
private readonly PaginationSequenceConfig _tableSequenceOrderConfig;

View File

@ -16,8 +16,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class DefaultShardingStreamEnumerable<TShardingDbContext,TEntity> :AbstractStreamEnumerable<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
internal class DefaultShardingStreamEnumerable<TEntity> :AbstractStreamEnumerable<TEntity>
{
public DefaultShardingStreamEnumerable(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{

View File

@ -11,8 +11,7 @@ using ShardingCore.Sharding.MergeEngines.Executors.Enumerators;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables
{
internal class EmptyQueryStreamEnumerable<TShardingDbContext, TEntity> : AbstractStreamEnumerable<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
internal class EmptyQueryStreamEnumerable<TEntity> : AbstractStreamEnumerable<TEntity>
{
public EmptyQueryStreamEnumerable(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{

View File

@ -25,8 +25,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class SequenceStreamEnumerable<TShardingDbContext, TEntity> : AbstractStreamEnumerable<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
internal class SequenceStreamEnumerable<TEntity> : AbstractStreamEnumerable<TEntity>
{
private readonly PaginationSequenceConfig _dataSourceSequenceMatchOrderConfig;
private readonly PaginationSequenceConfig _tableSequenceMatchOrderConfig;

View File

@ -19,8 +19,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
* @Date: Thursday, 02 September 2021 20:58:10
* @Email: 326308290@qq.com
*/
internal class SingleQueryStreamEnumerable<TShardingDbContext, TEntity> : AbstractStreamEnumerable<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
internal class SingleQueryStreamEnumerable<TEntity> : AbstractStreamEnumerable<TEntity>
{
public SingleQueryStreamEnumerable(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{

View File

@ -27,8 +27,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class EnumeratorStreamMergeEngineFactory<TShardingDbContext, TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
internal class EnumeratorStreamMergeEngineFactory<TEntity>
{
private readonly StreamMergeContext _streamMergeContext;
private readonly IShardingPageManager _shardingPageManager;
@ -42,9 +41,9 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
_entityMetadataManager = streamMergeContext.ShardingRuntimeContext.GetEntityMetadataManager();
}
public static EnumeratorStreamMergeEngineFactory<TShardingDbContext, TEntity> Create(StreamMergeContext streamMergeContext)
public static EnumeratorStreamMergeEngineFactory<TEntity> Create(StreamMergeContext streamMergeContext)
{
return new EnumeratorStreamMergeEngineFactory<TShardingDbContext, TEntity>(streamMergeContext);
return new EnumeratorStreamMergeEngineFactory<TEntity>(streamMergeContext);
}
public IVirtualDataSourceRoute GetRoute(Type entityType)
@ -55,17 +54,17 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
{
if (_streamMergeContext.IsRouteNotMatch())
{
return new EmptyQueryStreamEnumerable<TShardingDbContext, TEntity>(_streamMergeContext);
return new EmptyQueryStreamEnumerable<TEntity>(_streamMergeContext);
}
//本次查询没有跨库没有跨表就可以直接执行
if (!_streamMergeContext.IsMergeQuery())
{
return new SingleQueryStreamEnumerable<TShardingDbContext, TEntity>(_streamMergeContext);
return new SingleQueryStreamEnumerable<TEntity>(_streamMergeContext);
}
if (_streamMergeContext.UseUnionAllMerge())
{
return new DefaultShardingStreamEnumerable<TShardingDbContext, TEntity>(_streamMergeContext);
return new DefaultShardingStreamEnumerable<TEntity>(_streamMergeContext);
}
//未开启系统分表或者本次查询涉及多张分表
@ -94,7 +93,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
}
return new DefaultShardingStreamEnumerable<TShardingDbContext, TEntity>(_streamMergeContext);
return new DefaultShardingStreamEnumerable<TEntity>(_streamMergeContext);
}
private IStreamEnumerable<TEntity> DoNoOrderAppendEnumeratorStreamMergeEngine(Type shardingEntityType)
@ -130,7 +129,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
if (useSequenceEnumeratorMergeEngine)
{
return new AppendOrderSequenceStreamEnumerable<TShardingDbContext, TEntity>(_streamMergeContext, dataSourceSequenceOrderConfig, tableSequenceOrderConfig, _shardingPageManager.Current.RouteQueryResults);
return new AppendOrderSequenceStreamEnumerable<TEntity>(_streamMergeContext, dataSourceSequenceOrderConfig, tableSequenceOrderConfig, _shardingPageManager.Current.RouteQueryResults);
}
@ -173,7 +172,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
!_streamMergeContext.IsCrossDataSource)) || (!isShardingDataSource && isShardingTable && tableSequenceOrderConfig != null);
if (useSequenceEnumeratorMergeEngine)
{
return new SequenceStreamEnumerable<TShardingDbContext, TEntity>(_streamMergeContext, dataSourceSequenceOrderConfig, tableSequenceOrderConfig, _shardingPageManager.Current.RouteQueryResults, primaryOrder.IsAsc);
return new SequenceStreamEnumerable<TEntity>(_streamMergeContext, dataSourceSequenceOrderConfig, tableSequenceOrderConfig, _shardingPageManager.Current.RouteQueryResults, primaryOrder.IsAsc);
}
var total = _shardingPageManager.Current.RouteQueryResults.Sum(o => o.QueryResult);

View File

@ -0,0 +1,72 @@
// using System.Collections.Generic;
// using Microsoft.EntityFrameworkCore;
// using ShardingCore.Extensions;
// using ShardingCore.Sharding.Abstractions;
// using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge;
// using System.Linq;
// using System.Threading;
// using System.Threading.Tasks;
// using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
// using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
// using ShardingCore.Sharding.MergeEngines.Executors.Methods;
//
// namespace ShardingCore.Sharding.StreamMergeEngines
// {
// /*
// * @Author: xjm
// * @Description:
// * @Date: 2021/8/17 15:16:36
// * @Ver: 1.0
// * @Email: 326308290@qq.com
// */
// internal class FirstOrDefaultSkipAsyncInMemoryMergeEngine<TEntity> : IEnsureMergeResult<TEntity>
// {
// private readonly StreamMergeContext _streamMergeContext;
//
// public FirstOrDefaultSkipAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext)
// {
// _streamMergeContext = streamMergeContext;
// }
// // protected override IExecutor<RouteQueryResult<TEntity>> CreateExecutor0(bool async)
// // {
// // return new FirstOrDefaultMethodExecutor<TEntity>(GetStreamMergeContext());
// // }
// //
// // protected override TEntity DoMergeResult0(List<RouteQueryResult<TEntity>> resultList)
// // {
// // var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList();
// //
// // if (notNullResult.IsEmpty())
// // return default;
// //
// // var streamMergeContext = GetStreamMergeContext();
// // if (streamMergeContext.Orders.Any())
// // return notNullResult.AsQueryable().OrderWithExpression(streamMergeContext.Orders, streamMergeContext.GetShardingComparer()).FirstOrDefault();
// //
// // return notNullResult.FirstOrDefault();
// // }
// public TEntity MergeResult()
// {
// return MergeResultAsync().WaitAndUnwrapException(false);
// }
//
// public async Task<TEntity> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
// {
//
// //将toke改成1
// var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
//
// var list = new List<TEntity>();
// await foreach (var element in asyncEnumeratorStreamMergeEngine.WithCancellation(cancellationToken))
// {
// list.Add(element);
// }
//
// if (list.IsEmpty())
// {
// return default;
// }
// return list.FirstOrDefault();
// }
// }
// }

View File

@ -18,6 +18,8 @@ namespace ShardingCore.Sharding.ShardingExecutors.Abstractions
bool IsCrossTable();
bool IsCrossDataSource();
string QueryMethodName();
//bool IsEnumerableQuery();
}
}

View File

@ -70,10 +70,8 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
}
private TResult DoExecute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken = new CancellationToken())
{
if (mergeQueryCompilerContext.GetQueryExpression() is MethodCallExpression methodCallExpression)
{
switch (methodCallExpression.Method.Name)
var queryMethodName = mergeQueryCompilerContext.QueryMethodName();
switch (queryMethodName)
{
case nameof(Enumerable.First):
return EnsureResultTypeMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
@ -106,7 +104,6 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
case nameof(Enumerable.Contains):
return EnsureResultTypeMergeExecute<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
}
}
throw new ShardingCoreException($"db context operator not support query expression:[{mergeQueryCompilerContext.GetQueryExpression().ShardingPrint()}] result type:[{typeof(TResult).FullName}]");
@ -123,8 +120,8 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
var queryEntityType = combineQueryable.ElementType;
var streamMergeContext = GetStreamMergeContext(mergeQueryCompilerContext);
Type streamMergeEngineType = typeof(AsyncEnumeratorStreamMergeEngine<,>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(mergeQueryCompilerContext.GetShardingDbContextType(), queryEntityType);
Type streamMergeEngineType = typeof(AsyncEnumeratorStreamMergeEngine<>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
}

View File

@ -13,6 +13,7 @@ using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
@ -215,5 +216,24 @@ namespace ShardingCore.Sharding.ShardingExecutors
{
return _isCrossTable || _existCrossTableTails|| _queryCompilerContext.IsParallelQuery();
}
public string QueryMethodName()
{
if (IsEnumerableQuery())
{
throw new ShardingCoreInvalidOperationException(
$"queryable:[{GetQueryExpression().ShardingPrint()}] is enumerable query cant found query method name");
}
if (GetQueryExpression() is MethodCallExpression methodCallExpression)
{
return methodCallExpression.Method.Name;
}
else
{
throw new ShardingCoreInvalidOperationException(
$"queryable:[{GetQueryExpression().ShardingPrint()}] not {nameof(MethodCallExpression)} cant found query method name");
}
}
}
}

View File

@ -47,7 +47,7 @@ namespace ShardingCore.Sharding
private readonly IRouteTailFactory _routeTailFactory;
public int? Skip { get; private set; }
public int? Take { get; }
public int? Take { get; private set;}
public IEnumerable<PropertyOrder> Orders { get; private set; }
public SelectContext SelectContext => ParseResult.GetSelectContext();
@ -112,6 +112,10 @@ namespace ShardingCore.Sharding
{
Skip = skip;
}
public void ReSetTake(int? take)
{
Take = take;
}
/// <summary>
/// 创建对应的dbcontext
/// </summary>

View File

@ -40,10 +40,20 @@ namespace ShardingCore.Sharding
}
public StreamMergeContext Create(IMergeQueryCompilerContext mergeQueryCompilerContext)
{
var parseResult = _queryableParseEngine.Parse(mergeQueryCompilerContext);
var rewriteQueryable = _queryableRewriteEngine.GetRewriteQueryable(mergeQueryCompilerContext, parseResult);
var optimizeResult = _queryableOptimizeEngine.Optimize(mergeQueryCompilerContext, parseResult, rewriteQueryable);
return new StreamMergeContext(mergeQueryCompilerContext, parseResult, rewriteQueryable,optimizeResult, _routeTailFactory,_trackerManager,_shardingRouteConfigOptions);
}
private void CheckMergeContext(IMergeQueryCompilerContext mergeQueryCompilerContext,IParseResult parseResult,IQueryable rewriteQueryable,IOptimizeResult optimizeResult)
{
if (!mergeQueryCompilerContext.IsEnumerableQuery())
{
}
}
}
}