发布x.4.3.1 优化代码结构

This commit is contained in:
xuejiaming 2022-05-07 14:00:09 +08:00
parent d9081d234a
commit 22907db4ec
84 changed files with 352 additions and 1192 deletions

View File

@ -1,9 +1,9 @@
:start
::定义版本
set EFCORE2=2.4.2.14
set EFCORE3=3.4.2.14
set EFCORE5=5.4.2.14
set EFCORE6=6.4.2.14
set EFCORE2=2.4.3.1
set EFCORE3=3.4.3.1
set EFCORE5=5.4.3.1
set EFCORE6=6.4.3.1
::删除所有bin与obj下的文件
@echo off

View File

@ -61,14 +61,14 @@ namespace ShardingCore.EFCores
#if EFCORE2
public IAsyncEnumerable<TResult> GroupExecuteAsync<TResult>(Expression query)
public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(Expression query)
{
return _shardingComplierExecutor.GroupExecuteAsync<TResult>(_shardingDbContext, query);
return _shardingComplierExecutor.ExecuteAsync<TResult>(_shardingDbContext, query);
}
public Task<TResult> GroupExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
public Task<TResult> ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
return _shardingComplierExecutor.GroupExecuteAsync<TResult>(_shardingDbContext, query, cancellationToken);
return _shardingComplierExecutor.ExecuteAsync<TResult>(_shardingDbContext, query, cancellationToken);
}
[ExcludeFromCodeCoverage]

View File

@ -39,8 +39,8 @@ namespace ShardingCore.Sharding.Abstractions
/// <param name="shardingDbContext"></param>
/// <param name="query"></param>
/// <returns></returns>
IAsyncEnumerable<TResult> GroupExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query);
Task<TResult> GroupExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query, CancellationToken cancellationToken);
IAsyncEnumerable<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query);
Task<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query, CancellationToken cancellationToken);
#endif
}
}

View File

@ -38,8 +38,8 @@ namespace ShardingCore.Sharding.Abstractions
/// <param name="queryCompilerContext"></param>
/// <param name="query"></param>
/// <returns></returns>
IAsyncEnumerable<TResult> GroupExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext);
Task<TResult> GroupExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext, CancellationToken cancellationToken);
IAsyncEnumerable<TResult> ExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext);
Task<TResult> ExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext, CancellationToken cancellationToken);
#endif
}
}

View File

@ -1,16 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
namespace ShardingCore.Sharding.Abstractions.ParallelExecutors
{
internal interface IParallelExecuteControl<TResult>
{
Task<LinkedList<TResult>> ExecuteAsync(bool async, DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -1,17 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
namespace ShardingCore.Sharding.Abstractions.ParallelExecutors
{
internal interface IParallelExecutor<TResult>
{
Task<ShardingMergeResult<TResult>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit,
CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -1,7 +1,6 @@
using ShardingCore.Core;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
using System;
@ -139,7 +138,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
///// <param name="sqlExecutorUnitExecuteAsync"></param>
///// <param name="cancellationToken"></param>
///// <returns></returns>
//protected async Task<LinkedList<ShardingMergeResult<TResult>>> GroupExecuteAsync<TResult>(List<SqlExecutorUnit> sqlExecutorUnits, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
//protected async Task<LinkedList<ShardingMergeResult<TResult>>> ExecuteAsync<TResult>(List<SqlExecutorUnit> sqlExecutorUnits, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
//{
// if (sqlExecutorUnits.Count <= 0)
// {

View File

@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.MergeEngines.Abstractions

View File

@ -1,5 +1,4 @@
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
using ShardingCore.Sharding.StreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines;
using System;
using System.Collections.Generic;
using System.Linq;

View File

@ -1,6 +1,5 @@
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using System.Collections;
using System.Collections.Generic;

View File

@ -35,7 +35,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var resultList = await base.ExecuteAsync<AverageResult<TSelect>>(cancellationToken);
if (resultList.IsEmpty())
throw new InvalidOperationException("Sequence contains no elements.");
var queryable = resultList.Select(o => new
var queryable = resultList.Where(o=>o.HasQueryResult()).Select(o => new
{
Sum = o.QueryResult.Sum,
Count = o.QueryResult.Count

View File

@ -9,10 +9,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
@ -23,19 +21,106 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class MaxAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
internal class MaxAsyncInMemoryMergeEngine<TEntity, TResult> :AbstractInMemoryAsyncMergeEngine<TEntity>, IEnsureMergeResult<TResult>
{
public MaxAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
}
protected override TResult DoMergeResult(List<RouteQueryResult<TResult>> resultList)
protected override IExecutor<TR> CreateExecutor<TR>(bool async)
{
return resultList.Max(o => o.QueryResult);
var resultType = typeof(TEntity);
if (!resultType.IsNullableType())
{
if (typeof(decimal) == resultType)
{
return new MaxMethodExecutor<TEntity,decimal?>(GetStreamMergeContext()) as IExecutor<TR>;
}
if (typeof(float) == resultType)
{
return new MaxMethodExecutor<TEntity, float?>(GetStreamMergeContext()) as IExecutor<TR>;
}
if (typeof(int) == resultType)
{
return new MaxMethodExecutor<TEntity, int?>(GetStreamMergeContext()) as IExecutor<TR>;
}
if (typeof(long) == resultType)
{
return new MaxMethodExecutor<TEntity, long?>(GetStreamMergeContext()) as IExecutor<TR>;
}
if (typeof(double) == resultType)
{
return new MaxMethodExecutor<TEntity, double?>(GetStreamMergeContext()) as IExecutor<TR>;
}
throw new ShardingCoreException($"cant calc max value, type:[{resultType}]");
}
else
{
return new MaxMethodExecutor<TEntity,TEntity>(GetStreamMergeContext()) as IExecutor<TR>;
}
}
protected override IExecutor<RouteQueryResult<TResult>> CreateExecutor0(bool async)
public TResult MergeResult()
{
return new MaxMethodExecutor<TResult>(GetStreamMergeContext());
return MergeResultAsync().WaitAndUnwrapException(false);
}
public async Task<TResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var resultType = typeof(TEntity);
if (!resultType.IsNullableType())
{
if (typeof(decimal) == resultType)
{
var result = await base.ExecuteAsync<decimal?>(cancellationToken);
return GetMaxTResult<decimal?>(result);
}
if (typeof(float) == resultType)
{
var result = await base.ExecuteAsync<float?>(cancellationToken);
return GetMaxTResult<float?>(result);
}
if (typeof(int) == resultType)
{
var result = await base.ExecuteAsync<int?>(cancellationToken);
return GetMaxTResult<int?>(result);
}
if (typeof(long) == resultType)
{
var result = await base.ExecuteAsync<long?>(cancellationToken);
return GetMaxTResult<long?>(result);
}
if (typeof(double) == resultType)
{
var result = await base.ExecuteAsync<double?>(cancellationToken);
return GetMaxTResult<double?>(result);
}
throw new ShardingCoreException($"cant calc max value, type:[{resultType}]");
}
else
{
var result = await base.ExecuteAsync<TResult>(cancellationToken);
return result.Where(o => o.HasQueryResult()).Max(o => o.QueryResult);
}
}
private TResult GetMaxTResult<TInnerSelect>(List<RouteQueryResult<TInnerSelect>> source)
{
var routeQueryResults = source.Where(o => o.QueryResult != null).ToList();
if (routeQueryResults.IsEmpty())
throw new InvalidOperationException("Sequence contains no elements.");
var max = routeQueryResults.Max(o => o.QueryResult);
return ConvertNumber<TInnerSelect>(max);
}
private TResult ConvertNumber<TNumber>(TNumber number)
{
if (number == null)
return default;
var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult));
return Expression.Lambda<Func<TResult>>(convertExpr).Compile()();
}
}
}

View File

@ -9,10 +9,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
@ -23,19 +21,106 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class MinAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity,TResult>
internal class MinAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine<TEntity>, IEnsureMergeResult<TResult>
{
public MinAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
}
protected override TResult DoMergeResult(List<RouteQueryResult<TResult>> resultList)
protected override IExecutor<TR> CreateExecutor<TR>(bool async)
{
return resultList.Min(o => o.QueryResult);
var resultType = typeof(TEntity);
if (!resultType.IsNullableType())
{
if (typeof(decimal) == resultType)
{
return new MinMethodExecutor<TEntity,decimal?>(GetStreamMergeContext()) as IExecutor<TR>;
}
if (typeof(float) == resultType)
{
return new MinMethodExecutor<TEntity, float?>(GetStreamMergeContext()) as IExecutor<TR>;
}
if (typeof(int) == resultType)
{
return new MinMethodExecutor<TEntity, int?>(GetStreamMergeContext()) as IExecutor<TR>;
}
if (typeof(long) == resultType)
{
return new MinMethodExecutor<TEntity, long?>(GetStreamMergeContext()) as IExecutor<TR>;
}
if (typeof(double) == resultType)
{
return new MinMethodExecutor<TEntity, double?>(GetStreamMergeContext()) as IExecutor<TR>;
}
throw new ShardingCoreException($"cant calc min value, type:[{resultType}]");
}
else
{
return new MinMethodExecutor<TEntity, TEntity>(GetStreamMergeContext()) as IExecutor<TR>;
}
}
protected override IExecutor<RouteQueryResult<TResult>> CreateExecutor0(bool async)
public TResult MergeResult()
{
return new MinMethodExecutor<TResult>(GetStreamMergeContext());
return MergeResultAsync().WaitAndUnwrapException(false);
}
public async Task<TResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var resultType = typeof(TEntity);
if (!resultType.IsNullableType())
{
if (typeof(decimal) == resultType)
{
var result = await base.ExecuteAsync<decimal?>(cancellationToken);
return GetMinTResult<decimal?>(result);
}
if (typeof(float) == resultType)
{
var result = await base.ExecuteAsync<float?>(cancellationToken);
return GetMinTResult<float?>(result);
}
if (typeof(int) == resultType)
{
var result = await base.ExecuteAsync<int?>(cancellationToken);
return GetMinTResult<int?>(result);
}
if (typeof(long) == resultType)
{
var result = await base.ExecuteAsync<long?>(cancellationToken);
return GetMinTResult<long?>(result);
}
if (typeof(double) == resultType)
{
var result = await base.ExecuteAsync<double?>(cancellationToken);
return GetMinTResult<double?>(result);
}
throw new ShardingCoreException($"cant calc min value, type:[{resultType}]");
}
else
{
var result = await base.ExecuteAsync<TResult>(cancellationToken);
return result.Where(o => o.HasQueryResult()).Min(o => o.QueryResult);
}
}
private TResult GetMinTResult<TInnerSelect>(List<RouteQueryResult<TInnerSelect>> source)
{
var routeQueryResults = source.Where(o => o.HasQueryResult()).ToList();
if (routeQueryResults.IsEmpty())
throw new InvalidOperationException("Sequence contains no elements.");
var min = routeQueryResults.Min(o => o.QueryResult);
return ConvertNumber<TInnerSelect>(min);
}
private TResult ConvertNumber<TNumber>(TNumber number)
{
if (number == null)
return default;
var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TResult));
return Expression.Lambda<Func<TResult>>(convertExpr).Compile()();
}
}
}

View File

@ -7,10 +7,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.StreamMergeEngines

View File

@ -6,10 +6,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -6,10 +6,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.StreamMergeEngines

View File

@ -4,7 +4,7 @@ using System.Threading;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators.TrackerEnumerators;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables;
using ShardingCore.Sharding.ShardingQueryExecutors;
/*

View File

@ -1,34 +1,23 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeContexts;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables
{
/*
* @Author: xjm

View File

@ -1,26 +1,13 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables
{
/*
* @Author: xjm

View File

@ -1,12 +1,9 @@
using System;
using System.Collections;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables
{
internal class EmptyQueryEnumerator<T> : IAsyncEnumerator<T>,IEnumerator<T>
{

View File

@ -1,17 +1,15 @@
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Enumerators;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables
{
internal class EmptyQueryStreamEnumerable<TShardingDbContext, TEntity> : AbstractStreamEnumerable<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext

View File

@ -1,24 +1,16 @@
using ShardingCore.Core.Internal.Visitors;
using System.Linq;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeContexts;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Enumerators;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables
{
/*
* @Author: xjm

View File

@ -1,33 +1,22 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables
{
/*
* @Author: xjm

View File

@ -2,20 +2,16 @@ using System.Linq;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables
{
/*
* @Author: xjm
@ -42,7 +38,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
var routeResult = GetStreamMergeContext().TableRouteResults[0];
var shardingDbContext = GetStreamMergeContext().CreateDbContext(dataSourceName, routeResult, ConnectionModeEnum.MEMORY_STRICTLY);
var newQueryable = (IQueryable<TEntity>)GetStreamMergeContext().GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext);
var enumeratorParallelExecutor = new SingleQueryEnumeratorParallelExecutor<TEntity>();
var enumeratorParallelExecutor = new SingleQueryEnumeratorExecutor<TEntity>(GetStreamMergeContext());
if (async)
{
var asyncEnumerator = enumeratorParallelExecutor.GetAsyncEnumerator0(newQueryable).WaitAndUnwrapException();

View File

@ -16,9 +16,8 @@ using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeContexts;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumerables;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
{
@ -179,15 +178,15 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
}
var total = _shardingPageManager.Current.RouteQueryResults.Sum(o => o.QueryResult);
if (isShardingDataSource && virtualDataSourceRoute.EnablePagination)
if (isShardingDataSource)
{
dataSourceUseReverse =
EntityDataSourceUseReverseShardingPage(virtualDataSourceRoute, total);
virtualDataSourceRoute.EnablePagination && EntityDataSourceUseReverseShardingPage(virtualDataSourceRoute, total);
}
if (isShardingTable && virtualTable.EnablePagination)
if (isShardingTable)
{
tableUseReverse =
EntityTableReverseShardingPage(virtualTable, total);
virtualTable.EnablePagination && EntityTableReverseShardingPage(virtualTable, total);
}

View File

@ -5,7 +5,6 @@ using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;

View File

@ -1,10 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Abstractions.ParallelExecutors
namespace ShardingCore.Sharding.MergeEngines.Executors.Abstractions
{
/// <summary>
/// 断路器

View File

@ -1,8 +1,8 @@
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using System;
using System;
using System.Collections.Generic;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
{
internal abstract class AbstractCircuitBreaker: ICircuitBreaker
{

View File

@ -1,8 +1,8 @@
using ShardingCore.Sharding.StreamMergeEngines;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
{
internal class AllCircuitBreaker:AbstractCircuitBreaker
{

View File

@ -1,8 +1,8 @@
using ShardingCore.Sharding.StreamMergeEngines;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
{
internal class AnyCircuitBreaker:AbstractCircuitBreaker
{

View File

@ -1,8 +1,8 @@
using ShardingCore.Sharding.StreamMergeEngines;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
{
/// <summary>
/// use First、FirstOrDefault、Last、LastOrDefault、Max、Min

View File

@ -1,8 +1,8 @@
using ShardingCore.Sharding.StreamMergeEngines;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
{
internal class ContainsCircuitBreaker:AbstractCircuitBreaker
{

View File

@ -1,9 +1,9 @@
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
{
internal class EnumeratorCircuitBreaker : AbstractCircuitBreaker
{

View File

@ -1,6 +1,6 @@
using System.Collections.Generic;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
{
internal class NoTripCircuitBreaker:AbstractCircuitBreaker
{

View File

@ -1,8 +1,8 @@
using ShardingCore.Sharding.StreamMergeEngines;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
namespace ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers
{
internal class SingleOrSingleOrDefaultCircuitBreaker : AbstractCircuitBreaker
{

View File

@ -6,12 +6,16 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions
{
@ -120,7 +124,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions
return enumator;
#endif
#if EFCORE2
var enumator = new EFCore2TryCurrentAsyncEnumerator<TEntity>(newQueryable.AsAsyncEnumerable().GetEnumerator());
var enumator = new EFCore2TryCurrentAsyncEnumerator<TResult>(newQueryable.AsAsyncEnumerable().GetEnumerator());
await enumator.MoveNext();
return enumator;
#endif

View File

@ -7,6 +7,7 @@ using System.Threading.Tasks;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Executors.Enumerators.Abstractions;
@ -51,5 +52,18 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Enumerators
{
return _streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery())
{
var multiAggregateOrderStreamMergeAsyncEnumerator = new MultiAggregateOrderStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), streamsAsyncEnumerators);
return new PaginationStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), new[] { multiAggregateOrderStreamMergeAsyncEnumerator }, 0, GetStreamMergeContext().GetPaginationReWriteTake());
}
if (GetStreamMergeContext().IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), streamsAsyncEnumerators, 0, GetStreamMergeContext().GetPaginationReWriteTake());
return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
}
}
}

View File

@ -5,7 +5,6 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;

View File

@ -7,9 +7,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods

View File

@ -6,11 +6,11 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods

View File

@ -9,10 +9,10 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.AggregateMergeEngines;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{

View File

@ -6,9 +6,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods

View File

@ -6,9 +6,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{

View File

@ -6,9 +6,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{

View File

@ -6,9 +6,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{

View File

@ -6,9 +6,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{

View File

@ -6,9 +6,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{

View File

@ -6,9 +6,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -8,9 +9,10 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{
@ -20,7 +22,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
/// Author: xjm
/// Created: 2022/5/7 11:13:57
/// Email: 326308290@qq.com
internal class MaxMethodExecutor<TEntity> : AbstractMethodExecutor<TEntity>
internal class MaxMethodExecutor<TEntity,TResult> : AbstractMethodExecutor<TResult>
{
public MaxMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
@ -31,38 +33,56 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
return new AnyElementCircuitBreaker(GetStreamMergeContext());
}
protected override Task<TEntity> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
protected override Task<TResult> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
{
var resultType = typeof(TEntity);
if (!resultType.IsNullableType())
{
if (typeof(decimal) == resultType)
{
return queryable.As<IQueryable<decimal>>().Select(o => (decimal?)o).MaxAsync(cancellationToken).As<Task<TEntity>>();
return queryable.As<IQueryable<decimal>>().Select(o => (decimal?)o).MaxAsync(cancellationToken).As<Task<TResult>>();
}
if (typeof(float) == resultType)
{
return queryable.As<IQueryable<float>>().Select(o => (float?)o).MaxAsync(cancellationToken).As<Task<TEntity>>();
return queryable.As<IQueryable<float>>().Select(o => (float?)o).MaxAsync(cancellationToken).As<Task<TResult>>();
}
if (typeof(int) == resultType)
{
return queryable.As<IQueryable<int>>().Select(o => (int?)o).MaxAsync(cancellationToken).As<Task<TEntity>>();
return queryable.As<IQueryable<int>>().Select(o => (int?)o).MaxAsync(cancellationToken).As<Task<TResult>>();
}
if (typeof(long) == resultType)
{
return queryable.As<IQueryable<long>>().Select(o => (long?)o).MaxAsync(cancellationToken).As<Task<TEntity>>();
return queryable.As<IQueryable<long>>().Select(o => (long?)o).MaxAsync(cancellationToken).As<Task<TResult>>();
}
if (typeof(double) == resultType)
{
return queryable.As<IQueryable<double>>().Select(o => (double?)o).MaxAsync(cancellationToken).As<Task<TEntity>>();
return queryable.As<IQueryable<double>>().Select(o => (double?)o).MaxAsync(cancellationToken).As<Task<TResult>>();
}
throw new ShardingCoreException($"cant calc max value, type:[{resultType}]");
}
else
{
return queryable.As<IQueryable<TEntity>>().MaxAsync(cancellationToken);
return queryable.As<IQueryable<TEntity>>().MaxAsync(cancellationToken).As<Task<TResult>>();
}
}
//private TEntity GetMaxTResult<TInnerSelect>(List<RouteQueryResult<TInnerSelect>> source)
//{
// var routeQueryResults = source.Where(o => o.QueryResult != null).ToList();
// if (routeQueryResults.IsEmpty())
// throw new InvalidOperationException("Sequence contains no elements.");
// var max = routeQueryResults.Max(o => o.QueryResult);
// return ConvertMax<TInnerSelect>(max);
//}
//private TEntity ConvertMax<TNumber>(TNumber number)
//{
// if (number == null)
// return default;
// var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TEntity));
// return Expression.Lambda<Func<TEntity>>(convertExpr).Compile()();
//}
}
}

View File

@ -8,9 +8,9 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{
@ -20,7 +20,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
/// Author: xjm
/// Created: 2022/5/7 11:13:57
/// Email: 326308290@qq.com
internal class MinMethodExecutor<TEntity> : AbstractMethodExecutor<TEntity>
internal class MinMethodExecutor<TEntity,TResult> : AbstractMethodExecutor<TResult>
{
public MinMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
@ -31,37 +31,38 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
return new AnyElementCircuitBreaker(GetStreamMergeContext());
}
protected override Task<TEntity> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
protected override Task<TResult> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
{
var resultType = typeof(TEntity);
if (!resultType.IsNullableType())
{
if (typeof(decimal) == resultType)
{
return queryable.As<IQueryable<decimal>>().Select(o => (decimal?)o).MinAsync(cancellationToken).As<Task<TEntity>>();
return queryable.As<IQueryable<decimal>>().Select(o => (decimal?)o).MinAsync(cancellationToken).As<Task<TResult>>();
}
if (typeof(float) == resultType)
{
return queryable.As<IQueryable<float>>().Select(o => (float?)o).MinAsync(cancellationToken).As<Task<TEntity>>();
return queryable.As<IQueryable<float>>().Select(o => (float?)o).MinAsync(cancellationToken).As<Task<TResult>>();
}
if (typeof(int) == resultType)
{
return queryable.As<IQueryable<int>>().Select(o => (int?)o).MinAsync(cancellationToken).As<Task<TEntity>>();
return queryable.As<IQueryable<int>>().Select(o => (int?)o).MinAsync(cancellationToken).As<Task<TResult>>();
}
if (typeof(long) == resultType)
{
return queryable.As<IQueryable<long>>().Select(o => (long?)o).MinAsync(cancellationToken).As<Task<TEntity>>();
return queryable.As<IQueryable<long>>().Select(o => (long?)o).MinAsync(cancellationToken).As<Task<TResult>>();
}
if (typeof(double) == resultType)
{
return queryable.As<IQueryable<double>>().Select(o => (double?)o).MinAsync(cancellationToken).As<Task<TEntity>>();
return queryable.As<IQueryable<double>>().Select(o => (double?)o).MinAsync(cancellationToken).As<Task<TResult>>();
}
throw new ShardingCoreException($"cant calc max value, type:[{resultType}]");
throw new ShardingCoreException($"cant calc min value, type:[{resultType}]");
}
else
{
return queryable.As<IQueryable<TEntity>>().MinAsync(cancellationToken);
return queryable.As<IQueryable<TEntity>>().MinAsync(cancellationToken).As<Task<TResult>>();
}
}
}

View File

@ -6,9 +6,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{

View File

@ -6,9 +6,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{

View File

@ -9,9 +9,9 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
{
@ -44,7 +44,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
(Expression)null, cancellationToken);
#endif
#if EFCORE2
return ShardingEntityFrameworkQueryableExtensions.GroupExecuteAsync<TEntity, TEntity>(ShardingQueryableMethods.GetSumWithoutSelector(resultType), (IQueryable<TResult>)queryable, cancellationToken);
return ShardingEntityFrameworkQueryableExtensions.ExecuteAsync<TEntity, TEntity>(ShardingQueryableMethods.GetSumWithoutSelector(resultType), (IQueryable<TEntity>)queryable, cancellationToken);
#endif
}
}

View File

@ -9,11 +9,8 @@ using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -6,11 +6,8 @@ using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -8,10 +8,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -7,10 +7,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -7,10 +7,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -1,144 +0,0 @@
using ShardingCore.Core;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal abstract class AbstractParallelExecuteControl<TResult> : IParallelExecuteControl<TResult>
{
private readonly ISeqQueryProvider _seqQueryProvider;
private readonly IParallelExecutor<TResult> _executor;
/// <summary>
/// not cancelled const mark
/// </summary>
private const int notCancelled = 1;
/// <summary>
/// cancelled const mark
/// </summary>
private const int cancelled = 0;
/// <summary>
/// cancel status
/// </summary>
private int cancelStatus= notCancelled;
protected AbstractParallelExecuteControl(ISeqQueryProvider seqQueryProvider,IParallelExecutor<TResult> executor)
{
_seqQueryProvider = seqQueryProvider??throw new ArgumentNullException(nameof(seqQueryProvider));
_executor = executor;
}
protected ISeqQueryProvider GetSeqQueryProvider()
{
return _seqQueryProvider;
}
public abstract ICircuitBreaker CreateCircuitBreaker();
protected void Cancel()
{
Interlocked.Exchange(ref cancelStatus, cancelled);
}
private bool IsCancelled()
{
return cancelStatus == cancelled;
}
public async Task<LinkedList<TResult>> ExecuteAsync(bool async, DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
try
{
return await ExecuteAsync0(async, dataSourceSqlExecutorUnit, cancellationToken);
}
catch
{
Cancel();
throw;
}
}
private async Task<LinkedList<TResult>> ExecuteAsync0(bool async, DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var circuitBreaker = CreateCircuitBreaker();
var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups;
LinkedList<TResult> result = new LinkedList<TResult>();
//同数据库下多组数据间采用串行
foreach (var executorGroup in executorGroups)
{
//同组采用并行最大化用户配置链接数
var routeQueryResults = await ExecuteAsync(executorGroup.Groups, cancellationToken);
//严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext
if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
{
MergeParallelExecuteResult(result, routeQueryResults.Select(o => o.MergeResult), async);
var dbContexts = routeQueryResults.Select(o => o.DbContext);
foreach (var dbContext in dbContexts)
{
#if !EFCORE2
await dbContext.DisposeAsync();
#endif
#if EFCORE2
dbContext.Dispose();
#endif
}
}
else
{
foreach (var routeQueryResult in routeQueryResults)
{
result.AddLast(routeQueryResult.MergeResult);
}
}
if (IsCancelled()|| circuitBreaker.IsTrip(result))
break;
}
return result;
}
/// <summary>
/// 同库同组下面的并行异步执行,需要归并成一个结果
/// </summary>
/// <param name="sqlExecutorUnits"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected async Task<LinkedList<ShardingMergeResult<TResult>>> ExecuteAsync(List<SqlExecutorUnit> sqlExecutorUnits, CancellationToken cancellationToken = new CancellationToken())
{
if (sqlExecutorUnits.Count <= 0)
{
return new LinkedList<ShardingMergeResult<TResult>>();
}
else
{
var result = new LinkedList<ShardingMergeResult<TResult>>();
var tasks = sqlExecutorUnits.Select(sqlExecutorUnit => _executor.ExecuteAsync(sqlExecutorUnit, cancellationToken)).ToArray();
var results = await TaskHelper.WhenAllFastFail(tasks);
foreach (var r in results)
{
result.AddLast(r);
}
return result;
}
}
protected virtual void MergeParallelExecuteResult(LinkedList<TResult> previewResults, IEnumerable<TResult> parallelResults, bool async)
{
foreach (var parallelResult in parallelResults)
{
previewResults.AddLast(parallelResult);
}
}
}
}

View File

@ -1,33 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class AllParallelExecuteControl<TResult> : AbstractParallelExecuteControl<TResult>
{
private AllParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider, executor)
{
}
public static AllParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new AllParallelExecuteControl<TResult>(seqQueryProvider, executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
var allCircuitBreaker = new AllCircuitBreaker(GetSeqQueryProvider());
allCircuitBreaker.Register(() =>
{
Cancel();
});
return allCircuitBreaker;
}
}
}

View File

@ -1,28 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class AnyElementParallelExecuteControl<TResult> : AbstractParallelExecuteControl<TResult>
{
private AnyElementParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider,executor)
{
}
public static AnyElementParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new AnyElementParallelExecuteControl<TResult>(seqQueryProvider,executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
return new AnyElementCircuitBreaker(GetSeqQueryProvider());
}
}
}

View File

@ -1,33 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class AnyParallelExecuteControl<TResult> : AbstractParallelExecuteControl<TResult>
{
private AnyParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider, executor)
{
}
public static AnyParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new AnyParallelExecuteControl<TResult>(seqQueryProvider, executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
var anyCircuitBreaker = new AnyCircuitBreaker(GetSeqQueryProvider());
anyCircuitBreaker.Register(() =>
{
Cancel();
});
return anyCircuitBreaker;
}
}
}

View File

@ -1,33 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class ContainsParallelExecuteControl<TResult>:AbstractParallelExecuteControl<TResult>
{
public ContainsParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider, executor)
{
}
public static ContainsParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new ContainsParallelExecuteControl<TResult>(seqQueryProvider, executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
var circuitBreaker = new ContainsCircuitBreaker(GetSeqQueryProvider());
circuitBreaker.Register(() =>
{
Cancel();
});
return circuitBreaker;
}
}
}

View File

@ -1,81 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal abstract class AbstractEnumeratorParallelExecuteControl<TResult>:AbstractParallelExecuteControl<IStreamMergeAsyncEnumerator<TResult>>
{
private readonly StreamMergeContext _streamMergeContext;
protected AbstractEnumeratorParallelExecuteControl(StreamMergeContext streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor) : base(streamMergeContext, executor)
{
_streamMergeContext = streamMergeContext;
}
protected StreamMergeContext GetStreamMergeContext()
{
return _streamMergeContext;
}
public override ICircuitBreaker CreateCircuitBreaker()
{
return new EnumeratorCircuitBreaker(GetSeqQueryProvider());
}
protected override void MergeParallelExecuteResult(LinkedList<IStreamMergeAsyncEnumerator<TResult>> previewResults, IEnumerable<IStreamMergeAsyncEnumerator<TResult>> parallelResults, bool async)
{
var previewResultsCount = previewResults.Count;
if (previewResultsCount > 1)
{
throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} {nameof(previewResults)} has more than one element in container");
}
var parallelCount = parallelResults.Count();
if (parallelCount == 0)
return;
//聚合
if (previewResults is LinkedList<IStreamMergeAsyncEnumerator<TResult>> previewInMemoryStreamEnumeratorResults && parallelResults is IEnumerable<IStreamMergeAsyncEnumerator<TResult>> parallelStreamEnumeratorResults)
{
var mergeAsyncEnumerators = new LinkedList<IStreamMergeAsyncEnumerator<TResult>>();
if (previewResultsCount == 1)
{
mergeAsyncEnumerators.AddLast(previewInMemoryStreamEnumeratorResults.First());
}
foreach (var parallelStreamEnumeratorResult in parallelStreamEnumeratorResults)
{
mergeAsyncEnumerators.AddLast(parallelStreamEnumeratorResult);
}
var combineStreamMergeAsyncEnumerator = CombineInMemoryStreamMergeAsyncEnumerator(mergeAsyncEnumerators.ToArray());
var inMemoryStreamMergeAsyncEnumerator = new InMemoryStreamMergeAsyncEnumerator<TResult>(combineStreamMergeAsyncEnumerator, async);
previewInMemoryStreamEnumeratorResults.Clear();
previewInMemoryStreamEnumeratorResults.AddLast(inMemoryStreamMergeAsyncEnumerator);
//合并
return;
}
throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} is not {typeof(IStreamMergeAsyncEnumerator<TResult>)}");
}
/// <summary>
/// 合并成一个迭代器
/// </summary>
/// <param name="streamsAsyncEnumerators"></param>
/// <returns></returns>
public abstract IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators);
public virtual IStreamMergeAsyncEnumerator<TResult> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return CombineStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
}
}
}

View File

@ -1,28 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class AppendOrderSequenceParallelExecuteControl<TResult>:AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine _streamMergeCombine;
public AppendOrderSequenceParallelExecuteControl(StreamMergeContext streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor,IStreamMergeCombine streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
}
}

View File

@ -1,37 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class DefaultEnumeratorParallelExecuteControl<TResult>:AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine _streamMergeCombine;
public DefaultEnumeratorParallelExecuteControl(StreamMergeContext streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor, IStreamMergeCombine streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
public override IStreamMergeAsyncEnumerator<TResult> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
if (GetStreamMergeContext().IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), streamsAsyncEnumerators, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目
return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
}
}
}

View File

@ -1,28 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class EmptyQueryEnumeratorParallelExecuteControl<TResult>:AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine _streamMergeCombine;
public EmptyQueryEnumeratorParallelExecuteControl(StreamMergeContext streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor, IStreamMergeCombine streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
}
}

View File

@ -1,42 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class ReverseEnumeratorParallelExecuteControl<TResult>: AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine _streamMergeCombine;
public ReverseEnumeratorParallelExecuteControl(StreamMergeContext streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor, IStreamMergeCombine streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
public override IStreamMergeAsyncEnumerator<TResult> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery())
{
var multiAggregateOrderStreamMergeAsyncEnumerator = new MultiAggregateOrderStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), streamsAsyncEnumerators);
return new PaginationStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), new[] { multiAggregateOrderStreamMergeAsyncEnumerator }, 0, GetStreamMergeContext().GetPaginationReWriteTake());
}
if (GetStreamMergeContext().IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), streamsAsyncEnumerators, 0, GetStreamMergeContext().GetPaginationReWriteTake());
return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
}
}
}

View File

@ -1,30 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class SequenceEnumeratorParallelExecuteControl<TResult> : AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine _streamMergeCombine;
public SequenceEnumeratorParallelExecuteControl(StreamMergeContext streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor, IStreamMergeCombine streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
}
}

View File

@ -1,30 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class SingleQueryEnumeratorParallelExecuteControl<TResult>: AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine _streamMergeCombine;
public SingleQueryEnumeratorParallelExecuteControl(StreamMergeContext streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor, IStreamMergeCombine streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
}
}

View File

@ -1,33 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class NoTripParallelExecuteControl<TResult> : AbstractParallelExecuteControl<TResult>
{
private NoTripParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider, executor)
{
}
public static NoTripParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new NoTripParallelExecuteControl<TResult>(seqQueryProvider, executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
var circuitBreaker = new NoTripCircuitBreaker(GetSeqQueryProvider());
circuitBreaker.Register(() =>
{
Cancel();
});
return circuitBreaker;
}
}
}

View File

@ -1,33 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class SingleOrSingleOrDefaultParallelExecuteControl<TResult> : AbstractParallelExecuteControl<TResult>
{
private SingleOrSingleOrDefaultParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider, executor)
{
}
public static SingleOrSingleOrDefaultParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new SingleOrSingleOrDefaultParallelExecuteControl<TResult>(seqQueryProvider, executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
var circuitBreaker = new SingleOrSingleOrDefaultCircuitBreaker(GetSeqQueryProvider());
circuitBreaker.Register(() =>
{
Cancel();
});
return circuitBreaker;
}
}
}

View File

@ -1,76 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal abstract class AbstractEnumeratorParallelExecutor<TEntity>:IParallelExecutor<IStreamMergeAsyncEnumerator<TEntity>>
{
public abstract Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(
SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken());
/// <summary>
/// 开启异步线程获取并发迭代器
/// </summary>
/// <param name="queryable"></param>
/// <param name="async"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumerator(IQueryable<TEntity> queryable, bool async,
CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
if (async)
{
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
else
{
var enumerator = GetEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
}
}
/// <summary>
/// 获取异步迭代器
/// </summary>
/// <param name="newQueryable"></param>
/// <returns></returns>
public async Task<IAsyncEnumerator<TEntity>> GetAsyncEnumerator0(IQueryable<TEntity> newQueryable)
{
#if !EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
#endif
#if EFCORE2
var enumator = new EFCore2TryCurrentAsyncEnumerator<TEntity>(newQueryable.AsAsyncEnumerable().GetEnumerator());
await enumator.MoveNext();
return enumator;
#endif
}
/// <summary>
/// 获取同步迭代器
/// </summary>
/// <param name="newQueryable"></param>
/// <returns></returns>
public IEnumerator<TEntity> GetEnumerator0(IQueryable<TEntity> newQueryable)
{
var enumator = newQueryable.AsEnumerable().GetEnumerator();
enumator.MoveNext();
return enumator;
}
}
}

View File

@ -1,39 +0,0 @@
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class AppendOrderSequenceEnumeratorParallelExecutor<TEntity>:AbstractEnumeratorParallelExecutor<TEntity>
{
private readonly StreamMergeContext _streamMergeContext;
private readonly IQueryable<TEntity> _noPaginationQueryable;
private readonly bool _async;
public AppendOrderSequenceEnumeratorParallelExecutor(StreamMergeContext streamMergeContext, bool async)
{
_streamMergeContext = streamMergeContext;
_noPaginationQueryable = streamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().As<IQueryable<TEntity>>();
_async = async;
}
public override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var connectionMode = _streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var sequenceResult = sqlExecutorUnit.RouteUnit.As<SqlSequenceRouteUnit>().SequenceResult;
var shardingDbContext = _streamMergeContext.CreateDbContext(sequenceResult.DSName, sequenceResult.TableRouteResult, connectionMode);
var newQueryable = _noPaginationQueryable
.Skip(sequenceResult.Skip)
.Take(sequenceResult.Take)
.OrderWithExpression(_streamMergeContext.Orders)
.ReplaceDbContextQueryable(shardingDbContext)
.As<IQueryable<TEntity>>();
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, _async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(shardingDbContext,streamMergeAsyncEnumerator);
}
}
}

View File

@ -1,46 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class DefaultEnumeratorParallelExecutor<TEntity>:AbstractEnumeratorParallelExecutor<TEntity>
{
private readonly StreamMergeContext _streamMergeContext;
private readonly bool _async;
public DefaultEnumeratorParallelExecutor(StreamMergeContext streamMergeContext,bool async)
{
_streamMergeContext = streamMergeContext;
_async = async;
}
public override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var connectionMode = _streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var shardingDbContext = _streamMergeContext.CreateDbContext(dataSourceName, routeResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)_streamMergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, _async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(shardingDbContext, streamMergeAsyncEnumerator);
}
private (IQueryable<TEntity>, DbContext) CreateAsyncExecuteQueryable(string dsname, TableRouteResult tableRouteResult, ConnectionModeEnum connectionMode)
{
var shardingDbContext = _streamMergeContext.CreateDbContext(dsname, tableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)_streamMergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
return (newQueryable, shardingDbContext);
}
}
}

View File

@ -1,44 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class InMemoryParallelExecutor<TEntity, TResult> : IParallelExecutor<RouteQueryResult<TResult>>
{
private readonly StreamMergeContext _streamMergeContext;
private readonly Func<IQueryable, Task<TResult>> _efQuery;
public InMemoryParallelExecutor(StreamMergeContext streamMergeContext, Func<IQueryable, Task<TResult>> efQuery)
{
_streamMergeContext = streamMergeContext;
_efQuery = efQuery;
}
public async Task<ShardingMergeResult<RouteQueryResult<TResult>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var connectionMode = _streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var shardingDbContext = _streamMergeContext.CreateDbContext(dataSourceName, routeResult, connectionMode);
var newQueryable = _streamMergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var queryResult = await _efQuery(newQueryable);
var routeQueryResult = new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
return new ShardingMergeResult<RouteQueryResult<TResult>>(shardingDbContext, routeQueryResult);
}
}
}

View File

@ -1,44 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class ReverseEnumeratorParallelExecutor<TEntity> : AbstractEnumeratorParallelExecutor<TEntity>
{
private readonly StreamMergeContext _streamMergeContext;
private readonly IOrderedQueryable<TEntity> _reverseOrderQueryable;
private readonly bool _async;
public ReverseEnumeratorParallelExecutor(StreamMergeContext streamMergeContext, IOrderedQueryable<TEntity> reverseOrderQueryable, bool async)
{
_streamMergeContext = streamMergeContext;
_reverseOrderQueryable = reverseOrderQueryable;
_async = async;
}
public override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var connectionMode = _streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var shardingDbContext = _streamMergeContext.CreateDbContext(dataSourceName, routeResult, connectionMode);
var newQueryable = _reverseOrderQueryable
.ReplaceDbContextQueryable(shardingDbContext).As<IQueryable<TEntity>>();
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, _async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(shardingDbContext,
streamMergeAsyncEnumerator);
}
}
}

View File

@ -1,49 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class SequenceEnumeratorParallelExecutor<TEntity>: AbstractEnumeratorParallelExecutor<TEntity>
{
private readonly StreamMergeContext _streamMergeContext;
private readonly bool _async;
private readonly IQueryable<TEntity> _noPaginationQueryable;
public SequenceEnumeratorParallelExecutor(StreamMergeContext streamMergeContext,bool async)
{
_streamMergeContext = streamMergeContext;
_async = async;
_noPaginationQueryable = streamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().As<IQueryable<TEntity>>();
}
public override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var connectionMode = _streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var sequenceResult = sqlExecutorUnit.RouteUnit.As<SqlSequenceRouteUnit>().SequenceResult;
var shardingDbContext = _streamMergeContext.CreateDbContext(sequenceResult.DSName, sequenceResult.TableRouteResult, connectionMode);
var newQueryable = _noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take)
.ReplaceDbContextQueryable(shardingDbContext).As<IQueryable<TEntity>>();
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, _async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(shardingDbContext, streamMergeAsyncEnumerator);
}
private (IQueryable<TEntity>, DbContext) CreateAsyncExecuteQueryable( SequenceResult sequenceResult, ConnectionModeEnum connectionMode)
{
var shardingDbContext = _streamMergeContext.CreateDbContext(sequenceResult.DSName, sequenceResult.TableRouteResult, connectionMode);
var newQueryable = _noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take)
.ReplaceDbContextQueryable(shardingDbContext).As<IQueryable<TEntity>>();
return (newQueryable, shardingDbContext);
}
}
}

View File

@ -1,21 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class SingleQueryEnumeratorParallelExecutor<TEntity>:AbstractEnumeratorParallelExecutor<TEntity>
{
public override Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
throw new NotImplementedException();
}
}
}

View File

@ -7,10 +7,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -8,10 +8,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -93,7 +93,7 @@ namespace ShardingCore.Sharding
#if EFCORE2
public static Task<TResult> GroupExecuteAsync<TSource, TResult>(
public static Task<TResult> ExecuteAsync<TSource, TResult>(
MethodInfo operatorMethodInfo,
IQueryable<TSource> source,
CancellationToken cancellationToken = default (CancellationToken))
@ -104,7 +104,7 @@ namespace ShardingCore.Sharding
operatorMethodInfo = operatorMethodInfo.MakeGenericMethod(typeof (TSource));
MethodCallExpression methodCallExpression = Expression.Call((Expression) null, operatorMethodInfo, source.Expression);
CancellationToken cancellationToken1 = cancellationToken;
return provider.GroupExecuteAsync<TResult>((Expression) methodCallExpression, cancellationToken1);
return provider.ExecuteAsync<TResult>((Expression) methodCallExpression, cancellationToken1);
}
public static TResult Execute<TSource, TResult>(
MethodInfo operatorMethodInfo,

View File

@ -59,7 +59,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
#endif
#if EFCORE2
public IAsyncEnumerable<TResult> GroupExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query)
public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query)
{
//预解析表达式
var prepareParseResult = _prepareParser.Parse(shardingDbContext, query);
@ -67,11 +67,11 @@ namespace ShardingCore.Sharding.ShardingExecutors
using (new CustomerQueryScope(prepareParseResult))
{
var queryCompilerContext = _queryCompilerContextFactory.Create(prepareParseResult);
return _shardingTrackQueryExecutor.GroupExecuteAsync<TResult>(queryCompilerContext);
return _shardingTrackQueryExecutor.ExecuteAsync<TResult>(queryCompilerContext);
}
}
public Task<TResult> GroupExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query,
public Task<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query,
CancellationToken cancellationToken)
{
//预解析表达式
@ -80,7 +80,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
using (new CustomerQueryScope(prepareParseResult))
{
var queryCompilerContext = _queryCompilerContextFactory.Create(prepareParseResult);
return _shardingTrackQueryExecutor.GroupExecuteAsync<TResult>(queryCompilerContext, cancellationToken);
return _shardingTrackQueryExecutor.ExecuteAsync<TResult>(queryCompilerContext, cancellationToken);
}
}
#endif

View File

@ -135,37 +135,37 @@ namespace ShardingCore.Sharding.ShardingExecutors
#endif
#if EFCORE2
public IAsyncEnumerable<TResult> GroupExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext)
public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext)
{
var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor();
if (queryCompilerExecutor == null)
{
if (queryCompilerContext is IMergeQueryCompilerContext mergeQueryCompilerContext)
{
return _shardingQueryExecutor.GroupExecuteAsync<IAsyncEnumerable<TResult>>(mergeQueryCompilerContext);
return _shardingQueryExecutor.ExecuteAsync<IAsyncEnumerable<TResult>>(mergeQueryCompilerContext);
}
throw new ShardingCoreNotFoundException(queryCompilerContext.GetQueryExpression().ShardingPrint());
}
//native query
var result = queryCompilerExecutor.GetQueryCompiler().GroupExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression());
var result = queryCompilerExecutor.GetQueryCompiler().ExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression());
//native query track
return ResultTrackExecute(result, queryCompilerContext, TrackAsyncEnumerable, Track);
}
public Task<TResult> GroupExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext, CancellationToken cancellationToken)
public Task<TResult> ExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext, CancellationToken cancellationToken)
{
var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor();
if (queryCompilerExecutor == null)
{
if (queryCompilerContext is IMergeQueryCompilerContext mergeQueryCompilerContext)
{
return _shardingQueryExecutor.GroupExecuteAsync<Task<TResult>>(mergeQueryCompilerContext);
return _shardingQueryExecutor.ExecuteAsync<Task<TResult>>(mergeQueryCompilerContext);
}
throw new ShardingCoreNotFoundException(queryCompilerContext.GetQueryExpression().ShardingPrint());
}
//native query
var result = queryCompilerExecutor.GetQueryCompiler().GroupExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression(), cancellationToken);
var result = queryCompilerExecutor.GetQueryCompiler().ExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression(), cancellationToken);
//native query track
return ResultTrackExecute(result, queryCompilerContext, TrackEnumerable, TrackAsync);