完成顺序查询第一版本[#96]

This commit is contained in:
xuejiaming 2022-01-24 16:12:20 +08:00
parent a66b3cbf05
commit e41b479724
90 changed files with 2030 additions and 569 deletions

View File

@ -5,7 +5,7 @@
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>10.0</LangVersion>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>

View File

@ -13,6 +13,7 @@ namespace Sample.MySql.DbContexts
{
//切记不要在构造函数中使用会让模型提前创建的方法
//ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
//Database.SetCommandTimeout(30000);
}
protected override void OnModelCreating(ModelBuilder modelBuilder)

View File

@ -13,6 +13,7 @@ using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.ShardingEnumerableQueries;
using ShardingCore.Core.VirtualDatabase;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Sharding.EntityQueryConfigurations;
namespace ShardingCore.Core.VirtualTables
{
@ -32,7 +33,7 @@ namespace ShardingCore.Core.VirtualTables
private readonly IVirtualTableRoute<T> _virtualTableRoute;
/// <summary>
/// 分配置
/// 分配置
/// </summary>
public PaginationMetadata PaginationMetadata { get; }
@ -40,6 +41,14 @@ namespace ShardingCore.Core.VirtualTables
/// 是否启用智能分页
/// </summary>
public bool EnablePagination => PaginationMetadata != null;
/// <summary>
/// 查询配置
/// </summary>
public EntityQueryMetadata EntityQueryMetadata { get; }
/// <summary>
/// 是否启用表达式分片配置
/// </summary>
public bool EnableEntityQuery => EntityQueryMetadata != null;
private readonly ConcurrentDictionary<IPhysicTable, object> _physicTables = new ConcurrentDictionary<IPhysicTable, object>();
@ -55,6 +64,14 @@ namespace ShardingCore.Core.VirtualTables
var paginationBuilder = new PaginationBuilder<T>(PaginationMetadata);
paginationConfiguration.Configure(paginationBuilder);
}
var entityQueryConfiguration = virtualTableRoute.CreateEntityQueryConfiguration();
if (entityQueryConfiguration != null)
{
EntityQueryMetadata = new EntityQueryMetadata();
var entityQueryBuilder = new EntityQueryBuilder<T>(EntityQueryMetadata);
entityQueryConfiguration.Configure(entityQueryBuilder);
}
}
public List<IPhysicTable> GetAllPhysicTables()

View File

@ -6,6 +6,7 @@ using ShardingCore.Core.PhysicTables;
using ShardingCore.Core.VirtualDatabase;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Sharding.EntityQueryConfigurations;
using ShardingCore.Sharding.PaginationConfigurations;
namespace ShardingCore.Core.VirtualTables
@ -30,6 +31,14 @@ namespace ShardingCore.Core.VirtualTables
/// 是否启用分页配置
/// </summary>
bool EnablePagination { get; }
/// <summary>
/// 查询配置
/// </summary>
EntityQueryMetadata EntityQueryMetadata { get; }
/// <summary>
/// 是否启用表达式分片配置
/// </summary>
bool EnableEntityQuery { get; }
/// <summary>
/// 获取所有的物理表

View File

@ -5,6 +5,7 @@ using ShardingCore.Sharding.MergeEngines.ParallelControl;
using ShardingCore.Sharding.PaginationConfigurations;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Sharding.EntityQueryConfigurations;
namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.Abstractions
{
@ -14,12 +15,17 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.Abstractions
* @Date: Friday, 18 December 2020 14:33:01
* @Email: 326308290@qq.com
*/
public abstract class AbstractVirtualDataSourceRoute<T, TKey> : IVirtualDataSourceRoute<T>, IEntityMetadataAutoBindInitializer where T : class
public abstract class AbstractVirtualDataSourceRoute<TEntity, TKey> : IVirtualDataSourceRoute<TEntity>, IEntityMetadataAutoBindInitializer where TEntity : class
{
public EntityMetadata EntityMetadata { get; private set; }
private readonly DoOnlyOnce _doOnlyOnce = new DoOnlyOnce();
public IShardingEntityConfigOptions EntityConfigOptions { get; private set; }
public new PaginationMetadata PaginationMetadata { get; protected set; }
public bool EnablePagination => PaginationMetadata != null;
//public new EntityQueryMetadata EntityQueryMetadata { get; protected set; }
//public bool EnableEntityQuery => EnableEntityQuery != null;
public void Initialize(EntityMetadata entityMetadata)
{
@ -30,21 +36,32 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.Abstractions
if (paginationConfiguration != null)
{
PaginationMetadata = new PaginationMetadata();
var paginationBuilder = new PaginationBuilder<T>(PaginationMetadata);
var paginationBuilder = new PaginationBuilder<TEntity>(PaginationMetadata);
paginationConfiguration.Configure(paginationBuilder);
}
//var entityQueryConfiguration = CreateEntityQueryConfiguration();
//if (entityQueryConfiguration != null)
//{
// EntityQueryMetadata = new EntityQueryMetadata();
// var entityQueryBuilder= new EntityQueryBuilder<TEntity>(EntityQueryMetadata);
// entityQueryConfiguration.Configure(entityQueryBuilder);
//}
EntityConfigOptions =
ShardingContainer.GetRequiredShardingEntityConfigOption(entityMetadata.ShardingDbContextType);
}
public virtual IPaginationConfiguration<T> CreatePaginationConfiguration()
public virtual IPaginationConfiguration<TEntity> CreatePaginationConfiguration()
{
return null;
}
public new PaginationMetadata PaginationMetadata { get; protected set; }
public bool EnablePagination => PaginationMetadata != null;
//public virtual IEntityQueryConfiguration<TEntity> CreateEntityQueryConfiguration()
//{
// return null;
//}
/// <summary>
/// 分库字段如何转成对应的数据源名称 how convert sharding data source key to data source name
@ -75,6 +92,6 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes.Abstractions
/// 2.AutoCreateDataSource 启动时是否需要创建数据源
/// </summary>
/// <param name="builder"></param>
public abstract void Configure(EntityMetadataDataSourceBuilder<T> builder);
public abstract void Configure(EntityMetadataDataSourceBuilder<TEntity> builder);
}
}

View File

@ -4,6 +4,7 @@ using System.Linq;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.EntityShardingMetadatas;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Sharding.EntityQueryConfigurations;
using ShardingCore.Sharding.PaginationConfigurations;
namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes
@ -59,5 +60,10 @@ namespace ShardingCore.Core.VirtualRoutes.DataSourceRoutes
/// </summary>
/// <returns></returns>
IPaginationConfiguration<TEntity> CreatePaginationConfiguration();
///// <summary>
///// 配置查询
///// </summary>
///// <returns></returns>
//IEntityQueryConfiguration<TEntity> CreateEntityQueryConfiguration();
}
}

View File

@ -6,6 +6,7 @@ using ShardingCore.Sharding.MergeEngines.ParallelControl;
using ShardingCore.Sharding.PaginationConfigurations;
using System.Collections.Generic;
using System.Linq;
using ShardingCore.Sharding.EntityQueryConfigurations;
namespace ShardingCore.Core.VirtualRoutes.TableRoutes.Abstractions
{
@ -29,10 +30,7 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.Abstractions
EntityConfigOptions =
ShardingContainer.GetRequiredShardingEntityConfigOption(entityMetadata.ShardingDbContextType);
}
public virtual IPaginationConfiguration<T> CreatePaginationConfiguration()
{
return null;
}
public EntityMetadata EntityMetadata { get; private set; }
@ -73,5 +71,14 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes.Abstractions
/// </summary>
/// <param name="builder"></param>
public abstract void Configure(EntityMetadataTableBuilder<T> builder);
public virtual IPaginationConfiguration<T> CreatePaginationConfiguration()
{
return null;
}
public virtual IEntityQueryConfiguration<T> CreateEntityQueryConfiguration()
{
return null;
}
}
}

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.PhysicTables;
using ShardingCore.Sharding.EntityQueryConfigurations;
using ShardingCore.Sharding.PaginationConfigurations;
namespace ShardingCore.Core.VirtualRoutes.TableRoutes
@ -46,13 +47,18 @@ namespace ShardingCore.Core.VirtualRoutes.TableRoutes
}
public interface IVirtualTableRoute<T> : IVirtualTableRoute, IEntityMetadataTableConfiguration<T> where T : class
public interface IVirtualTableRoute<TEntity> : IVirtualTableRoute, IEntityMetadataTableConfiguration<TEntity> where TEntity : class
{
/// <summary>
/// 返回null就是表示不开启分页配置
/// </summary>
/// <returns></returns>
IPaginationConfiguration<T> CreatePaginationConfiguration();
IPaginationConfiguration<TEntity> CreatePaginationConfiguration();
/// <summary>
/// 配置查询
/// </summary>
/// <returns></returns>
IEntityQueryConfiguration<TEntity> CreateEntityQueryConfiguration();
}
}

View File

@ -1,34 +0,0 @@
//using System;
//using System.Collections.Generic;
//using System.Linq;
//using System.Text;
//using System.Threading.Tasks;
//using Microsoft.EntityFrameworkCore;
//using ShardingCore.Core;
//namespace ShardingCore.Extensions
//{
// internal static class ConnectionModeExtension
// {
// public static async Task<T> ReleaseConnectionAsync<T>(this Task<T> executeTask, DbContext dbContext,
// ConnectionModeEnum connectionMode)
// {
// try
// {
// return await executeTask;
// }
// finally
// {
//// if (connectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
//// {
////#if !EFCORE2
//// await dbContext.DisposeAsync();
////#endif
////#if EFCORE2
//// dbContext.Dispose();
////#endif
//// }
// }
// }
// }
//}

View File

@ -15,6 +15,11 @@ namespace ShardingCore.Extensions.InternalExtensions
*/
internal static class InternalLinqExtension
{
public static IEnumerable<TSource> OrderByAscDescIf<TSource, TKey>(this IEnumerable<TSource> source, Func<TSource, TKey> keySelector,bool asc,
IComparer<TKey>? comparer)
{
return asc?source.OrderBy(keySelector, comparer): source.OrderByDescending(keySelector, comparer);
}
public static IOrderedEnumerable<TSource> ThenByIf<TSource, TKey>(this IOrderedEnumerable<TSource> source, Func<TSource, TKey> keySelector, bool condition,
IComparer<TKey>? comparer)
{

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding;
namespace ShardingCore.Extensions.InternalExtensions
{
internal static class StreamMergeContextExtension
{
public static bool IsSeqQuery<TEntity>( this StreamMergeContext<TEntity> streamMergeContext)
{
return streamMergeContext.EntitySeqQueryConfig != null;
}
public static bool IsParallelExecute<TEntity>( this StreamMergeContext<TEntity> streamMergeContext)
{
return streamMergeContext.TableRouteResults.Length <= streamMergeContext.GetMaxQueryConnectionsLimit();
}
}
}

View File

@ -1,48 +0,0 @@
using System;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Sharding;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Extensions
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 02 September 2021 20:46:24
* @Email: 326308290@qq.com
*/
public static class StreamMergeContextExtension
{
///// <summary>
///// 单表查询
///// </summary>
///// <typeparam name="TEntity"></typeparam>
///// <param name="streamMergeContext"></param>
///// <returns></returns>
//public static bool IsSingleShardingTableQuery<TEntity>(this StreamMergeContext<TEntity> streamMergeContext)
//{
// return streamMergeContext.TableRouteResults.First().ReplaceTables.Count(o => o.EntityType.IsShardingTable()) == 1;
//}
///// <summary>
///// 本次查询仅包含一个对象的分表分库
///// </summary>
///// <typeparam name="TEntity"></typeparam>
///// <param name="streamMergeContext"></param>
///// <returns></returns>
//public static bool IsSingleShardingQuery<TEntity>(this StreamMergeContext<TEntity> streamMergeContext)
//{
// return streamMergeContext.GetOriginalQueryable().ParseQueryableRoute().Count(o=>o.IsShardingTable()||o.IsShardingDataSource())==1;
//}
public static bool IsSupportPaginationQuery<TShardingDbContext,TEntity>(this StreamMergeContext<TEntity> streamMergeContext) where TShardingDbContext:DbContext,IShardingDbContext
{
var entityMetadataManager = ShardingContainer.GetService<IEntityMetadataManager<TShardingDbContext>>();
var queryEntities = streamMergeContext.GetOriginalQueryable().ParseQueryableEntities(typeof(TShardingDbContext));
//仅一个对象支持分库或者分表的组合
return queryEntities.Count(o=>(entityMetadataManager.IsShardingDataSource(o) &&!entityMetadataManager.IsShardingTable(o)) ||(entityMetadataManager.IsShardingDataSource(o)&& entityMetadataManager.IsShardingTable(o))|| (!entityMetadataManager.IsShardingDataSource(o) && entityMetadataManager.IsShardingTable(o))) ==1;
}
}
}

View File

@ -150,5 +150,6 @@ namespace ShardingCore.Extensions
cancellationToken.ThrowIfCancellationRequested();
}
}
}
}

View File

@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Helpers
{
internal class TaskHelper
{
private TaskHelper()
{
throw new InvalidOperationException(nameof(TaskHelper));
}
public static Task<TResult[]> WhenAllFastFail<TResult>(params Task<TResult>[] tasks)
{
if (tasks is null || tasks.Length == 0) return Task.FromResult(Array.Empty<TResult>());
// defensive copy.
var defensive = tasks.Clone() as Task<TResult>[];
var tcs = new TaskCompletionSource<TResult[]>();
var remaining = defensive.Length;
Action<Task> check = t =>
{
switch (t.Status)
{
case TaskStatus.Faulted:
// we 'try' as some other task may beat us to the punch.
tcs.TrySetException(t.Exception.InnerException);
break;
case TaskStatus.Canceled:
// we 'try' as some other task may beat us to the punch.
tcs.TrySetCanceled();
break;
default:
// we can safely set here as no other task remains to run.
if (Interlocked.Decrement(ref remaining) == 0)
{
// get the results into an array.
var results = new TResult[defensive.Length];
for (var i = 0; i < tasks.Length; ++i) results[i] = defensive[i].Result;
tcs.SetResult(results);
}
break;
}
};
foreach (var task in defensive)
{
task.ContinueWith(check, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
return tcs.Task;
}
}
}

View File

@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Abstractions
{
public interface ISeqQueryProvider
{
bool IsSeqQuery();
bool IsParallelExecute();
}
}

View File

@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Abstractions.ParallelExecutors
{
/// <summary>
/// 断路器
/// </summary>
public interface ICircuitBreaker
{
/// <summary>
/// 是否拉闸表示是否终端
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="results"></param>
/// <returns></returns>
bool IsTrip<TResult>(IEnumerable<TResult> results);
/// <summary>
/// 跳闸
/// </summary>
void Trip();
void Register(Action afterTrip);
}
}

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
namespace ShardingCore.Sharding.Abstractions.ParallelExecutors
{
internal interface IParallelExecuteControl<TResult>
{
Task<LinkedList<TResult>> ExecuteAsync(bool async, DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
namespace ShardingCore.Sharding.Abstractions.ParallelExecutors
{
internal interface IParallelExecutor<TResult>
{
Task<ShardingMergeResult<TResult>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit,
CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
namespace ShardingCore.Sharding.EntityQueryConfigurations
{
public class EntityQueryBuilder<TEntity> where TEntity : class
{
private readonly EntityQueryMetadata _entityQueryMetadata;
public EntityQueryBuilder(EntityQueryMetadata entityQueryMetadata)
{
_entityQueryMetadata = entityQueryMetadata;
}
/// <summary>
/// 添加条件顺序查询配置
/// </summary>
/// <typeparam name="TProperty"></typeparam>
/// <param name="primaryOrderPropertyExpression">主排序字段</param>
/// <param name="parallelThreadQueryCount">迭代器获取该值不生效</param>
/// <param name="routeComparer"></param>
/// <returns></returns>
public EntityQueryBuilder<TEntity> AddEntityQuerySeqConfig<TProperty>(Expression<Func<TEntity, TProperty>> primaryOrderPropertyExpression, int parallelThreadQueryCount, IComparer<string> routeComparer = null)
{
var entitySeqQueryConfig = new EntitySeqQueryConfig(primaryOrderPropertyExpression, parallelThreadQueryCount, routeComparer);
if (_entityQueryMetadata.EntityOrderSeqQueryConfigs.ContainsKey(entitySeqQueryConfig.PrimaryOrderPropertyInfo.Name))
{
throw new ShardingCoreConfigException(
$"repeat {nameof(AddEntityQuerySeqConfig)} property name:[{entitySeqQueryConfig.PrimaryOrderPropertyInfo.Name}]");
}
_entityQueryMetadata.EntityOrderSeqQueryConfigs.Add(entitySeqQueryConfig.PrimaryOrderPropertyInfo.Name, entitySeqQueryConfig);
return this;
}
}
}

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Extensions;
namespace ShardingCore.Sharding.EntityQueryConfigurations
{
public class EntityQueryMetadata
{
public IDictionary<string, EntitySeqQueryConfig> EntityOrderSeqQueryConfigs { get; }
public EntityQueryMetadata()
{
EntityOrderSeqQueryConfigs = new Dictionary<string, EntitySeqQueryConfig>();//倒叙comparer
}
public bool TryGetSeqQueryConfig(string orderPropertyName, out EntitySeqQueryConfig seqQueryConfig)
{
if (!string.IsNullOrWhiteSpace(orderPropertyName))
{
if (EntityOrderSeqQueryConfigs.TryGetValue(orderPropertyName, out seqQueryConfig))
{
return true;
}
}
seqQueryConfig = null;
return false;
}
}
}

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Internal;
namespace ShardingCore.Sharding.EntityQueryConfigurations
{
/// <summary>
/// 对象顺序查询
/// </summary>
public class EntitySeqQueryConfig
{
public IComparer<string> RouteComparer { get; }
public PropertyInfo PrimaryOrderPropertyInfo { get; }
public int ParallelThreadQueryCount { get; }
/// <summary>
///
/// </summary>
/// <param name="primaryOrderPropertyExpression">排序字段</param>
/// <param name="parallelThreadQueryCount">并发线程数</param>
/// <param name="routeComparer">tail后缀比较器 asc</param>
/// <param name="maxTake">最大查询条数</param>
public EntitySeqQueryConfig(LambdaExpression primaryOrderPropertyExpression,int parallelThreadQueryCount, IComparer<string> routeComparer = null)
{
if (primaryOrderPropertyExpression == null) throw new ArgumentNullException(nameof(primaryOrderPropertyExpression));
if(parallelThreadQueryCount<=0) throw new ArgumentException(nameof(parallelThreadQueryCount));
PrimaryOrderPropertyInfo = primaryOrderPropertyExpression.GetPropertyAccess();
ParallelThreadQueryCount = parallelThreadQueryCount;
RouteComparer=routeComparer??Comparer<string>.Default;
}
}
}

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.EntityQueryConfigurations
{
public interface IEntityQueryConfiguration<TEntity> where TEntity:class
{
void Configure(EntityQueryBuilder<TEntity> builder);
}
}

View File

@ -1,20 +1,15 @@
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.ParallelControl;
using ShardingCore.Core;
using ShardingCore.Core.NotSupportShardingProviders.Abstractions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core;
using ShardingCore.Core.NotSupportShardingProviders;
using ShardingCore.Core.NotSupportShardingProviders.Abstractions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.Common.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
@ -28,46 +23,6 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
internal abstract class AbstractBaseMergeEngine<TEntity>
{
protected abstract StreamMergeContext<TEntity> GetStreamMergeContext();
///// <summary>
///// 异步多线程控制并发
///// </summary>
///// <typeparam name="TResult"></typeparam>
///// <param name="executeAsync"></param>
///// <param name="cancellationToken"></param>
///// <returns></returns>
//public Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,CancellationToken cancellationToken=new CancellationToken())
//{
// cancellationToken.ThrowIfCancellationRequested();
// var acquired = this._semaphore.Wait((int)parallelTimeOut, cancellationToken);
// if (acquired)
// {
// var once = new SemaphoreReleaseOnlyOnce(this._semaphore);
// try
// {
// return Task.Run(async () =>
// {
// try
// {
// return await executeAsync();
// }
// finally
// {
// once.Release();
// }
// }, cancellationToken);
// }
// catch (Exception)
// {
// once.Release();
// throw;
// }
// }
// else
// {
// throw new ShardingCoreParallelQueryTimeOutException($"execute async time out:[{timeOut.TotalMilliseconds}ms]");
// }
//}
protected bool IsUnSupport()
{
return GetStreamMergeContext().IsUnSupportSharding();
@ -79,11 +34,12 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
/// <typeparam name="TResult"></typeparam>
/// <param name="async"></param>
/// <param name="sqlRouteUnits"></param>
/// <param name="sqlExecutorUnitExecuteAsync"></param>
/// <param name="executor"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<LinkedList<TResult>>[] GetDataSourceGroupAndExecutorGroup<TResult>(bool async, IEnumerable<ISqlRouteUnit> sqlRouteUnits, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
public Task<LinkedList<TResult>>[] GetDataSourceGroupAndExecutorGroup<TResult>(bool async, IEnumerable<ISqlRouteUnit> sqlRouteUnits,IParallelExecutor<TResult> executor, CancellationToken cancellationToken = new CancellationToken())
{
var parallelExecuteControl = CreateParallelExecuteControl(executor);
var waitTaskQueue = AggregateQueryByDataSourceName(sqlRouteUnits)
.Select(GetSqlExecutorGroups)
.Select(dataSourceSqlExecutorUnit =>
@ -97,12 +53,12 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
((UnSupportSqlRouteUnit)dataSourceSqlExecutorUnit.SqlExecutorGroups[0].Groups[0]
.RouteUnit).TableRouteResults))
{
return await DoExecuteAsync(async, dataSourceSqlExecutorUnit, sqlExecutorUnitExecuteAsync, cancellationToken);
return await parallelExecuteControl.ExecuteAsync(async, dataSourceSqlExecutorUnit, cancellationToken);
}
}
else
{
return await DoExecuteAsync(async, dataSourceSqlExecutorUnit, sqlExecutorUnitExecuteAsync, cancellationToken);
return await parallelExecuteControl.ExecuteAsync(async, dataSourceSqlExecutorUnit, cancellationToken);
}
// var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups;
// LinkedList<TResult> result = new LinkedList<TResult>();
@ -142,56 +98,11 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
return waitTaskQueue;
}
public async Task<LinkedList<TResult>> DoExecuteAsync<TResult>(bool async,DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
{
var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups;
LinkedList<TResult> result = new LinkedList<TResult>();
//同数据库下多组数据间采用串行
foreach (var executorGroup in executorGroups)
{
//同组采用并行最大化用户配置链接数
var routeQueryResults = await ExecuteAsync<TResult>(executorGroup.Groups, sqlExecutorUnitExecuteAsync, cancellationToken);
//严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext
if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
{
MergeParallelExecuteResult(result, routeQueryResults.Select(o => o.MergeResult), async);
var dbContexts = routeQueryResults.Select(o => o.DbContext);
foreach (var dbContext in dbContexts)
{
#if !EFCORE2
await dbContext.DisposeAsync();
#endif
#if EFCORE2
dbContext.Dispose();
#endif
}
}
else
{
foreach (var routeQueryResult in routeQueryResults)
{
result.AddLast(routeQueryResult.MergeResult);
}
}
}
return result;
}
public virtual void MergeParallelExecuteResult<TResult>(LinkedList<TResult> previewResults, IEnumerable<TResult> parallelResults, bool async)
{
foreach (var parallelResult in parallelResults)
{
previewResults.AddLast(parallelResult);
}
}
protected abstract IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor);
protected virtual IEnumerable<ISqlRouteUnit> GetDefaultSqlRouteUnits()
{
var streamMergeContext = GetStreamMergeContext();
return streamMergeContext.DataSourceRouteResult.IntersectDataSources.SelectMany(
dataSourceName =>
{
@ -205,7 +116,28 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
}
protected virtual IEnumerable<IGrouping<string, ISqlRouteUnit>> AggregateQueryByDataSourceName(IEnumerable<ISqlRouteUnit> sqlRouteUnits)
{
return sqlRouteUnits.GroupBy(o => o.DataSourceName);
return ReOrderTableTails(sqlRouteUnits).GroupBy(o => o.DataSourceName);
}
/// <summary>
/// 顺序查询从重排序
/// </summary>
/// <param name="sqlRouteUnits"></param>
/// <returns></returns>
private IEnumerable<ISqlRouteUnit> ReOrderTableTails(IEnumerable<ISqlRouteUnit> sqlRouteUnits)
{
var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.EntitySeqQueryConfig != null)
{
var equalPropertyOrder = ExecuteOrderEqualPropertyOrder();
return sqlRouteUnits.OrderByAscDescIf(o => o.TableRouteResult.ReplaceTables.First().Tail,
(equalPropertyOrder?streamMergeContext.PrimaryOrderAsc.Value: !streamMergeContext.PrimaryOrderAsc.Value), streamMergeContext.EntitySeqQueryConfig.RouteComparer);
}
return sqlRouteUnits;
}
protected virtual bool ExecuteOrderEqualPropertyOrder()
{
return true;
}
/// <summary>
/// 每个数据源下的分表结果按 maxQueryConnectionsLimit 进行组合分组每组大小 maxQueryConnectionsLimit
@ -263,7 +195,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
}
var firstResult = await sqlExecutorUnitExecuteAsync(sqlExecutorUnits[0]);
result.AddLast(firstResult);
var otherResults = await Task.WhenAll(tasks);
var otherResults = await TaskHelper.WhenAllFastFail(tasks);
foreach (var otherResult in otherResults)
{
result.AddLast(otherResult);
@ -271,7 +203,6 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
return result;
}
}
}
}

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
internal interface IStreamMergeCombine<TEntity>
{
IStreamMergeAsyncEnumerator<TEntity> StreamMergeEnumeratorCombine(StreamMergeContext<TEntity> streamMergeContext,IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators);
}
}

View File

@ -1,19 +1,11 @@
using System;
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
using ShardingCore.Sharding.StreamMergeEngines;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.StreamMergeEngines;
using ShardingCore.Helpers;
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
{
@ -33,58 +25,19 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
_mergeContext = streamMergeContext;
}
private (IQueryable queryable, DbContext dbContext) CreateAsyncExecuteQueryable<TResult>(string dsname, TableRouteResult tableRouteResult, ConnectionModeEnum connectionMode)
{
var shardingDbContext = _mergeContext.CreateDbContext(dsname, tableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)GetStreamMergeContext().GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
return (newQueryable, shardingDbContext);
;
}
public async Task<List<RouteQueryResult<TResult>>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken())
{
var routeQueryResults = _mergeContext.PreperExecute(() => new List<RouteQueryResult<TResult>>(0));
if (routeQueryResults != null)
return routeQueryResults;
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var waitExecuteQueue = GetDataSourceGroupAndExecutorGroup<RouteQueryResult<TResult>>(true, defaultSqlRouteUnits,
async sqlExecutorUnit =>
{
var connectionMode = _mergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var inMemoryParallelExecutor = new InMemoryParallelExecutor<TEntity,TResult>(_mergeContext,efQuery);
var waitExecuteQueue = GetDataSourceGroupAndExecutorGroup<RouteQueryResult<TResult>>(true, defaultSqlRouteUnits, inMemoryParallelExecutor).ToArray();
var (asyncExecuteQueryable, dbContext) =
CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult, connectionMode);
var queryResult = await efQuery(asyncExecuteQueryable);
var routeQueryResult = new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
return new ShardingMergeResult<RouteQueryResult<TResult>>(dbContext, routeQueryResult);
}).ToArray();
return (await Task.WhenAll(waitExecuteQueue)).SelectMany(o => o).ToList();
return (await TaskHelper.WhenAllFastFail(waitExecuteQueue)).SelectMany(o => o).ToList();
}
///// <summary>
///// 异步并发查询
///// </summary>
///// <typeparam name="TResult"></typeparam>
///// <param name="queryable"></param>
///// <param name="dataSourceName"></param>
///// <param name="routeResult"></param>
///// <param name="efQuery"></param>
///// <param name="cancellationToken"></param>
///// <returns></returns>
//public async Task<RouteQueryResult<TResult>> AsyncParallelResultExecute<TResult>(IQueryable queryable,string dataSourceName,TableRouteResult routeResult, Func<IQueryable, Task<TResult>> efQuery,
// CancellationToken cancellationToken = new CancellationToken())
//{
// cancellationToken.ThrowIfCancellationRequested();
// var queryResult = await efQuery(queryable);
// return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
//}
protected override StreamMergeContext<TEntity> GetStreamMergeContext()
{

View File

@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
{
internal abstract class AbstractNoTripEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
{
protected AbstractNoTripEnsureMethodCallInMemoryAsyncMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
}
protected override IParallelExecuteControl<TQResult> CreateParallelExecuteControl<TQResult>(IParallelExecutor<TQResult> executor)
{
return NoTripParallelExecuteControl<TQResult>.Create(GetStreamMergeContext(), executor);
}
}
}

View File

@ -1,16 +1,11 @@
using System;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
@ -26,6 +21,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
*/
internal abstract class AbstractEnumeratorStreamMergeEngine<TEntity> : AbstractBaseMergeEngine<TEntity>, IEnumeratorStreamMergeEngine<TEntity>
{
private readonly IStreamMergeCombine<TEntity> _streamMergeCombine;
public StreamMergeContext<TEntity> StreamMergeContext { get; }
protected override StreamMergeContext<TEntity> GetStreamMergeContext()
@ -33,8 +29,14 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
return StreamMergeContext;
}
public AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
protected IStreamMergeCombine<TEntity> GetStreamMergeCombine()
{
return _streamMergeCombine;
}
protected AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext,IStreamMergeCombine<TEntity> streamMergeCombine)
{
_streamMergeCombine = streamMergeCombine;
StreamMergeContext = streamMergeContext;
}
@ -94,115 +96,24 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
/// <returns></returns>
public abstract IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken());
/// <summary>
/// 合并流式聚合内存最小化
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="previewResults"></param>
/// <param name="parallelResults"></param>
/// <param name="async"></param>
/// <returns></returns>
/// <exception cref="ShardingCoreInvalidOperationException"></exception>
public override void MergeParallelExecuteResult<TResult>(LinkedList<TResult> previewResults, IEnumerable<TResult> parallelResults,bool async)
{
var previewResultsCount = previewResults.Count;
if (previewResultsCount > 1)
{
throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} {nameof(previewResults)} has more than one element in container");
}
var parallelCount = parallelResults.Count();
if (parallelCount == 0)
return;
//聚合
if (previewResults is LinkedList<IStreamMergeAsyncEnumerator<TEntity>> previewInMemoryStreamEnumeratorResults && parallelResults is IEnumerable<IStreamMergeAsyncEnumerator<TEntity>> parallelStreamEnumeratorResults)
{
var mergeAsyncEnumerators = new LinkedList<IStreamMergeAsyncEnumerator<TEntity>>();
if (previewResultsCount == 1)
{
mergeAsyncEnumerators.AddLast(previewInMemoryStreamEnumeratorResults.First());
}
foreach (var parallelStreamEnumeratorResult in parallelStreamEnumeratorResults)
{
mergeAsyncEnumerators.AddLast(parallelStreamEnumeratorResult);
}
var combineStreamMergeAsyncEnumerator = CombineInMemoryStreamMergeAsyncEnumerator(mergeAsyncEnumerators.ToArray());
var inMemoryStreamMergeAsyncEnumerator = new InMemoryStreamMergeAsyncEnumerator<TEntity>(combineStreamMergeAsyncEnumerator, async);
previewInMemoryStreamEnumeratorResults.Clear();
previewInMemoryStreamEnumeratorResults.AddLast(inMemoryStreamMergeAsyncEnumerator);
//合并
return;
}
throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} is not {typeof(IStreamMergeAsyncEnumerator<TEntity>)}");
}
/// <summary>
/// 合并成一个迭代器
/// </summary>
/// <param name="streamsAsyncEnumerators"></param>
/// <returns></returns>
public abstract IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators);
public virtual IStreamMergeAsyncEnumerator<TEntity> CombineInMemoryStreamMergeAsyncEnumerator(
private IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
return CombineStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
return GetStreamMergeCombine().StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
/// <summary>
/// 开启异步线程获取并发迭代器
/// </summary>
/// <param name="queryable"></param>
/// <param name="async"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumerator(IQueryable<TEntity> queryable, bool async,
CancellationToken cancellationToken = new CancellationToken())
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(
IParallelExecutor<TResult> executor)
{
cancellationToken.ThrowIfCancellationRequested();
if (async)
{
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
else
{
var enumerator = GetEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
}
}
/// <summary>
/// 获取异步迭代器
/// </summary>
/// <param name="newQueryable"></param>
/// <returns></returns>
public async Task<IAsyncEnumerator<TEntity>> GetAsyncEnumerator0(IQueryable<TEntity> newQueryable)
{
#if !EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
#endif
#if EFCORE2
var enumator = new EFCore2TryCurrentAsyncEnumerator<TEntity>(newQueryable.AsAsyncEnumerable().GetEnumerator());
await enumator.MoveNext();
return enumator;
#endif
}
/// <summary>
/// 获取同步迭代器
/// </summary>
/// <param name="newQueryable"></param>
/// <returns></returns>
public IEnumerator<TEntity> GetEnumerator0(IQueryable<TEntity> newQueryable)
{
var enumator = newQueryable.AsEnumerable().GetEnumerator();
enumator.MoveNext();
return enumator;
return CreateParallelExecuteControl0(executor as IParallelExecutor<IStreamMergeAsyncEnumerator<TEntity>>) as IParallelExecuteControl<TResult>;
}
protected abstract IParallelExecuteControl<IStreamMergeAsyncEnumerator<TEntity>> CreateParallelExecuteControl0(
IParallelExecutor<IStreamMergeAsyncEnumerator<TEntity>> executor);
}
}

View File

@ -1,7 +1,6 @@
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators.AggregateExtensions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge;
using ShardingCore.Sharding.MergeEngines.AggregateMergeEngines;
@ -21,7 +20,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class AverageAsyncInMemoryMergeEngine<TEntity, TResult,TSelect> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
internal class AverageAsyncInMemoryMergeEngine<TEntity, TResult,TSelect> : AbstractNoTripEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
{
public AverageAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
@ -83,5 +82,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
}
}
}
}

View File

@ -9,6 +9,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
@ -19,7 +21,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class MaxAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
internal class MaxAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractNoTripEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
{
public MaxAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{

View File

@ -9,6 +9,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
@ -19,7 +21,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class MinAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity,TResult>
internal class MinAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractNoTripEnsureMethodCallInMemoryAsyncMergeEngine<TEntity,TResult>
{
public MinAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{

View File

@ -1,6 +1,5 @@
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators.AggregateExtensions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge;
using System;
@ -19,7 +18,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class SumAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
internal class SumAsyncInMemoryMergeEngine<TEntity, TResult> : AbstractNoTripEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
{
public SumAsyncInMemoryMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{

View File

@ -6,6 +6,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.StreamMergeEngines
@ -40,5 +42,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return result.All(o => o.QueryResult);
}
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor)
{
return AllParallelExecuteControl<TResult>.Create(GetStreamMergeContext(),executor);
}
}
}

View File

@ -5,6 +5,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -28,5 +30,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return result.Any(o => o.QueryResult);
}
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor)
{
return AnyParallelExecuteControl<TResult>.Create(GetStreamMergeContext(),executor);
}
}
}

View File

@ -5,6 +5,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
namespace ShardingCore.Sharding.StreamMergeEngines
@ -35,5 +37,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return result.Any(o => o.QueryResult);
}
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor)
{
return ContainsParallelExecuteControl<TResult>.Create(GetStreamMergeContext(),executor);
}
}
}

View File

@ -6,6 +6,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -39,5 +41,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return result.Sum(o=>o.QueryResult);
}
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor)
{
return NoTripParallelExecuteControl<TResult>.Create(GetStreamMergeContext(), executor);
}
}
}

View File

@ -9,12 +9,18 @@ using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
@ -33,7 +39,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
private readonly PaginationSequenceConfig _dataSourceSequenceOrderConfig;
private readonly PaginationSequenceConfig _tableSequenceOrderConfig;
private readonly ICollection<RouteQueryResult<long>> _routeQueryResults;
public AppendOrderSequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, PaginationSequenceConfig dataSourceSequenceOrderConfig, PaginationSequenceConfig tableSequenceOrderConfig, ICollection<RouteQueryResult<long>> routeQueryResults) : base(streamMergeContext)
public AppendOrderSequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, PaginationSequenceConfig dataSourceSequenceOrderConfig, PaginationSequenceConfig tableSequenceOrderConfig, ICollection<RouteQueryResult<long>> routeQueryResults) : base(streamMergeContext,new AppendOrderSequenceStreamMergeCombine<TEntity>())
{
_dataSourceSequenceOrderConfig = dataSourceSequenceOrderConfig;
_tableSequenceOrderConfig = tableSequenceOrderConfig;
@ -80,10 +86,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
sortRouteResults = sortRouteResults.OrderByDescending(o => o.DataSourceName,
_dataSourceSequenceOrderConfig.RouteComparer).ThenByDescendingIf(o => o.Tail, useThenBy, _tableSequenceOrderConfig?.RouteComparer);
}
reSetOrders.Add(new PropertyOrder(_dataSourceSequenceOrderConfig.PropertyName, _dataSourceSequenceOrderConfig.AppendAsc));
reSetOrders.Add(new PropertyOrder(_dataSourceSequenceOrderConfig.PropertyName, _dataSourceSequenceOrderConfig.AppendAsc, _dataSourceSequenceOrderConfig.OrderPropertyInfo.DeclaringType));
if (useThenBy)
{
reSetOrders.Add(new PropertyOrder(_tableSequenceOrderConfig.PropertyName, _tableSequenceOrderConfig.AppendAsc));
reSetOrders.Add(new PropertyOrder(_tableSequenceOrderConfig.PropertyName, _tableSequenceOrderConfig.AppendAsc, _tableSequenceOrderConfig.OrderPropertyInfo.DeclaringType));
}
}
else
@ -99,41 +105,24 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
sortRouteResults =
sortRouteResults.OrderByDescending(o => o.Tail, _tableSequenceOrderConfig.RouteComparer);
}
reSetOrders.Add(new PropertyOrder(_tableSequenceOrderConfig.PropertyName, _tableSequenceOrderConfig.AppendAsc));
reSetOrders.Add(new PropertyOrder(_tableSequenceOrderConfig.PropertyName, _tableSequenceOrderConfig.AppendAsc, _tableSequenceOrderConfig.OrderPropertyInfo.DeclaringType));
}
var sequenceResults = new SequencePaginationList(sortRouteResults.Select(o => o.RouteQueryResult)).Skip(skip).Take(take).ToList();
StreamMergeContext.ReSetOrders(reSetOrders);
var sqlSequenceRouteUnits = sequenceResults.Select(sequenceResult => new SqlSequenceRouteUnit(sequenceResult));
var appendOrderSequenceEnumeratorParallelExecutor = new AppendOrderSequenceEnumeratorParallelExecutor<TEntity>(GetStreamMergeContext(),async);
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(async,sqlSequenceRouteUnits,
async sqlExecutorUnit =>
{
var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var (newQueryable, dbContext) = CreateAsyncExecuteQueryable(noPaginationQueryable,
((SqlSequenceRouteUnit)sqlExecutorUnit.RouteUnit).SequenceResult, reSetOrders, connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext,
streamMergeAsyncEnumerator);
}, cancellationToken);
appendOrderSequenceEnumeratorParallelExecutor, cancellationToken);
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o => o).ToArray();
var streamEnumerators = TaskHelper.WhenAllFastFail(enumeratorTasks).WaitAndUnwrapException().SelectMany(o => o).ToArray();
return streamEnumerators;
}
private (IQueryable<TEntity>, DbContext) CreateAsyncExecuteQueryable(IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult, IEnumerable<PropertyOrder> reSetOrders, ConnectionModeEnum connectionMode)
protected override IParallelExecuteControl<IStreamMergeAsyncEnumerator<TEntity>> CreateParallelExecuteControl0(IParallelExecutor<IStreamMergeAsyncEnumerator<TEntity>> executor)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(sequenceResult.DSName, sequenceResult.TableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take).OrderWithExpression(reSetOrders))
.ReplaceDbContextQueryable(shardingDbContext);
return (newQueryable, shardingDbContext);
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new AppendOrderSequenceParallelExecuteControl<TEntity>(GetStreamMergeContext(), executor, GetStreamMergeCombine());
}
}
}

View File

@ -7,11 +7,16 @@ using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
@ -25,7 +30,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
internal class DefaultShardingEnumeratorAsyncStreamMergeEngine<TShardingDbContext,TEntity> :AbstractEnumeratorStreamMergeEngine<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
{
public DefaultShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
public DefaultShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext,new DefaultStreamMergeCombine<TEntity>())
{
}
@ -33,45 +38,16 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
{
cancellationToken.ThrowIfCancellationRequested();
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(async,defaultSqlRouteUnits,
async sqlExecutorUnit =>
{
var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var (newQueryable,dbContext) = CreateAsyncExecuteQueryable(dataSourceName, routeResult, connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext, streamMergeAsyncEnumerator);
}, cancellationToken);
var defaultEnumeratorParallelExecutor = new DefaultEnumeratorParallelExecutor<TEntity>(GetStreamMergeContext(),async);
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(async,defaultSqlRouteUnits, defaultEnumeratorParallelExecutor, cancellationToken);
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray();
var streamEnumerators = TaskHelper.WhenAllFastFail(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray();
return streamEnumerators;
}
private (IQueryable<TEntity>,DbContext) CreateAsyncExecuteQueryable(string dsname,TableRouteResult tableRouteResult,ConnectionModeEnum connectionMode)
protected override IParallelExecuteControl<IStreamMergeAsyncEnumerator<TEntity>> CreateParallelExecuteControl0(IParallelExecutor<IStreamMergeAsyncEnumerator<TEntity>> executor)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)StreamMergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
return (newQueryable,shardingDbContext);
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators,0, StreamMergeContext.GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目
return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
return new DefaultEnumeratorParallelExecuteControl<TEntity>(GetStreamMergeContext(), executor, GetStreamMergeCombine());
}
}
}

View File

@ -1,24 +1,20 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using System.Collections.Generic;
using System.Threading;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
internal class EmptyQueryEnumeratorAsyncStreamMergeEngine<TShardingDbContext, TEntity> : AbstractEnumeratorStreamMergeEngine<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
{
public EmptyQueryEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
public EmptyQueryEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext,new EmptyStreamMergeCombine<TEntity>())
{
}
@ -35,14 +31,11 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
return new[] { new StreamMergeAsyncEnumerator<TEntity>((IEnumerator<TEntity>)asyncEnumerator) };
}
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
protected override IParallelExecuteControl<IStreamMergeAsyncEnumerator<TEntity>> CreateParallelExecuteControl0(IParallelExecutor<IStreamMergeAsyncEnumerator<TEntity>> executor)
{
if (streamsAsyncEnumerators.Length != 1)
throw new ShardingCoreException($"{nameof(EmptyQueryEnumeratorAsyncStreamMergeEngine<TShardingDbContext, TEntity>)} has more {nameof(IStreamMergeAsyncEnumerator<TEntity>)}");
return streamsAsyncEnumerators[0];
return new EmptyQueryEnumeratorParallelExecuteControl<TEntity>(GetStreamMergeContext(), executor, GetStreamMergeCombine());
}
}
}

View File

@ -1,19 +1,18 @@
using System;
using System.Collections.Generic;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Helpers;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
@ -28,82 +27,34 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
{
private readonly long _total;
public ReverseShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, long total) : base(streamMergeContext)
public ReverseShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, long total) : base(streamMergeContext,new ReverseStreamMergeCombine<TEntity>())
{
_total = total;
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async,CancellationToken cancellationToken=new CancellationToken())
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var noPaginationNoOrderQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveAnyOrderBy();
var skip = StreamMergeContext.Skip.GetValueOrDefault();
var take = StreamMergeContext.Take.HasValue?StreamMergeContext.Take.Value:(_total-skip);
var take = StreamMergeContext.Take.HasValue ? StreamMergeContext.Take.Value : (_total - skip);
if (take > int.MaxValue)
throw new ShardingCoreException($"not support take more than {int.MaxValue}");
var realSkip = _total- take- skip;
var realSkip = _total - take - skip;
StreamMergeContext.ReSetSkip((int)realSkip);
var propertyOrders = StreamMergeContext.Orders.Select(o=>new PropertyOrder( o.PropertyExpression,!o.IsAsc)).ToArray();
var propertyOrders = StreamMergeContext.Orders.Select(o => new PropertyOrder(o.PropertyExpression, !o.IsAsc, o.OwnerType)).ToArray();
StreamMergeContext.ReSetOrders(propertyOrders);
var reverseOrderQueryable = noPaginationNoOrderQueryable.Take((int)realSkip+(int)take).OrderWithExpression(propertyOrders);
var reverseOrderQueryable = noPaginationNoOrderQueryable.Take((int)realSkip + (int)take).OrderWithExpression(propertyOrders);
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(async,defaultSqlRouteUnits,
async sqlExecutorUnit =>
{
var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var (newQueryable,dbContext) =
CreateAsyncExecuteQueryable(dataSourceName, reverseOrderQueryable, routeResult, connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async,cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext,
streamMergeAsyncEnumerator);
});
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray();
var reverseEnumeratorParallelExecutor = new ReverseEnumeratorParallelExecutor<TEntity>(GetStreamMergeContext(), reverseOrderQueryable, async);
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(async, defaultSqlRouteUnits, reverseEnumeratorParallelExecutor, cancellationToken);
var streamEnumerators = TaskHelper.WhenAllFastFail(enumeratorTasks).WaitAndUnwrapException().SelectMany(o => o).ToArray();
return streamEnumerators;
}
private (IQueryable<TEntity>,DbContext) CreateAsyncExecuteQueryable(string dsname,IQueryable<TEntity> reverseOrderQueryable, TableRouteResult tableRouteResult,ConnectionModeEnum connectionMode)
protected override IParallelExecuteControl<IStreamMergeAsyncEnumerator<TEntity>> CreateParallelExecuteControl0(IParallelExecutor<IStreamMergeAsyncEnumerator<TEntity>> executor)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)reverseOrderQueryable
.ReplaceDbContextQueryable(shardingDbContext);
return (newQueryable,shardingDbContext);
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
var doGetStreamMergeAsyncEnumerator = DoGetStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
return new InMemoryReverseStreamMergeAsyncEnumerator<TEntity>(doGetStreamMergeAsyncEnumerator);
}
private IStreamMergeAsyncEnumerator<TEntity> DoGetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.IsPaginationQuery() && StreamMergeContext.HasGroupQuery())
{
var multiAggregateOrderStreamMergeAsyncEnumerator = new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, new[] { multiAggregateOrderStreamMergeAsyncEnumerator });
}
if (StreamMergeContext.IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.IsPaginationQuery() && StreamMergeContext.HasGroupQuery())
{
var multiAggregateOrderStreamMergeAsyncEnumerator = new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, new[] { multiAggregateOrderStreamMergeAsyncEnumerator }, 0, StreamMergeContext.GetPaginationReWriteTake());
}
if (StreamMergeContext.IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators, 0, StreamMergeContext.GetPaginationReWriteTake());
return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
return new ReverseEnumeratorParallelExecuteControl<TEntity>(GetStreamMergeContext(), executor, GetStreamMergeCombine());
}
}
}

View File

@ -8,12 +8,19 @@ using ShardingCore.Core;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
@ -33,7 +40,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
private readonly PaginationSequenceConfig _tableSequenceMatchOrderConfig;
private readonly ICollection<RouteQueryResult<long>> _routeQueryResults;
private readonly bool _isAsc;
public SequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, PaginationSequenceConfig dataSourceSequenceMatchOrderConfig, PaginationSequenceConfig tableSequenceMatchOrderConfig, ICollection<RouteQueryResult<long>> routeQueryResults, bool isAsc) : base(streamMergeContext)
public SequenceEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext, PaginationSequenceConfig dataSourceSequenceMatchOrderConfig, PaginationSequenceConfig tableSequenceMatchOrderConfig, ICollection<RouteQueryResult<long>> routeQueryResults, bool isAsc) : base(streamMergeContext, new SequenceStreamMergeCombine<TEntity>())
{
_dataSourceSequenceMatchOrderConfig = dataSourceSequenceMatchOrderConfig;
_tableSequenceMatchOrderConfig = tableSequenceMatchOrderConfig;
@ -44,7 +51,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake();
var skip = StreamMergeContext.Skip.GetValueOrDefault();
if (skip < 0)
throw new ShardingCoreException("skip must ge 0");
@ -92,31 +98,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
var sequenceResults = new SequencePaginationList(sortRouteResults.Select(o => o.RouteQueryResult)).Skip(skip).Take(take).ToList();
var sqlSequenceRouteUnits = sequenceResults.Select(sequenceResult => new SqlSequenceRouteUnit(sequenceResult));
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(async,sqlSequenceRouteUnits,
async sqlExecutorUnit =>
{
var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var (newQueryable, dbContext) = CreateAsyncExecuteQueryable(noPaginationQueryable,
((SqlSequenceRouteUnit)sqlExecutorUnit.RouteUnit).SequenceResult, connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext,streamMergeAsyncEnumerator);
}, cancellationToken);
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray();
var sequenceEnumeratorParallelExecutor = new SequenceEnumeratorParallelExecutor<TEntity>(GetStreamMergeContext(), async);
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<IStreamMergeAsyncEnumerator<TEntity>>(async, sqlSequenceRouteUnits, sequenceEnumeratorParallelExecutor, cancellationToken);
var streamEnumerators = TaskHelper.WhenAllFastFail(enumeratorTasks).WaitAndUnwrapException().SelectMany(o => o).ToArray();
return streamEnumerators;
}
private (IQueryable<TEntity>,DbContext) CreateAsyncExecuteQueryable(IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult,ConnectionModeEnum connectionMode)
protected override IParallelExecuteControl<IStreamMergeAsyncEnumerator<TEntity>> CreateParallelExecuteControl0(IParallelExecutor<IStreamMergeAsyncEnumerator<TEntity>> executor)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(sequenceResult.DSName, sequenceResult.TableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take))
.ReplaceDbContextQueryable(shardingDbContext);
return (newQueryable, shardingDbContext);
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
return new SequenceEnumeratorParallelExecuteControl<TEntity>(GetStreamMergeContext(), executor, GetStreamMergeCombine());
}
}
}

View File

@ -5,8 +5,13 @@ using ShardingCore.Core;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines;
using ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators;
using ShardingCore.Sharding.MergeEngines.ParallelExecutors;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
@ -19,36 +24,33 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
internal class SingleQueryEnumeratorAsyncStreamMergeEngine<TShardingDbContext, TEntity> : AbstractEnumeratorStreamMergeEngine<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
{
public SingleQueryEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
public SingleQueryEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext, new SingleStreamMergeCombine<TEntity>())
{
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async,CancellationToken cancellationToken=new CancellationToken())
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var dataSourceName = StreamMergeContext.DataSourceRouteResult.IntersectDataSources.First();
var routeResult = StreamMergeContext.TableRouteResults.First();
var shardingDbContext = StreamMergeContext.CreateDbContext(dataSourceName,routeResult,ConnectionModeEnum.MEMORY_STRICTLY);
var newQueryable = (IQueryable<TEntity>) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext);
var routeResult = StreamMergeContext.TableRouteResults[0];
var shardingDbContext = StreamMergeContext.CreateDbContext(dataSourceName, routeResult, ConnectionModeEnum.MEMORY_STRICTLY);
var newQueryable = (IQueryable<TEntity>)StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext);
var enumeratorParallelExecutor = new SingleQueryEnumeratorParallelExecutor<TEntity>();
if (async)
{
var asyncEnumerator = GetAsyncEnumerator0(newQueryable).WaitAndUnwrapException();
var asyncEnumerator = enumeratorParallelExecutor.GetAsyncEnumerator0(newQueryable).WaitAndUnwrapException();
return new[] { new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator) };
}
else
{
var enumerator = GetEnumerator0(newQueryable);
var enumerator = enumeratorParallelExecutor.GetEnumerator0(newQueryable);
return new[] { new StreamMergeAsyncEnumerator<TEntity>(enumerator) };
}
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
protected override IParallelExecuteControl<IStreamMergeAsyncEnumerator<TEntity>> CreateParallelExecuteControl0(IParallelExecutor<IStreamMergeAsyncEnumerator<TEntity>> executor)
{
if (streamsAsyncEnumerators.Length != 1)
throw new ShardingCoreException($"{nameof(SingleQueryEnumeratorAsyncStreamMergeEngine<TShardingDbContext,TEntity>)} has more {nameof(IStreamMergeAsyncEnumerator<TEntity>)}");
return streamsAsyncEnumerators[0];
return new SingleQueryEnumeratorParallelExecuteControl<TEntity>(GetStreamMergeContext(), executor, GetStreamMergeCombine());
}
}
}

View File

@ -65,10 +65,10 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
}
//未开启系统分表或者本次查询涉及多张分表
if (_streamMergeContext.IsPaginationQuery() && _streamMergeContext.IsSupportPaginationQuery<TShardingDbContext, TEntity>() && _shardingPageManager.Current != null)
if (_streamMergeContext.IsPaginationQuery() && _streamMergeContext.IsSingleShardingEntityQuery() && _shardingPageManager.Current != null)
{
//获取虚拟表判断是否启用了分页配置
var shardingEntityType = _streamMergeContext.QueryEntities.FirstOrDefault(o => _entityMetadataManager.IsShardingDataSource(o) || _entityMetadataManager.IsShardingTable(o));
var shardingEntityType = _streamMergeContext.GetSingleShardingEntityType();
if (shardingEntityType == null)
throw new ShardingCoreException($"query not found sharding data source or sharding table entity");
@ -236,7 +236,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
return false;
if (paginationSequenceConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Owner))
return typeof(TEntity) == paginationSequenceConfig.OrderPropertyInfo.DeclaringType;
return _streamMergeContext.GetSingleShardingEntityType() == paginationSequenceConfig.OrderPropertyInfo.DeclaringType;
if (paginationSequenceConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Named))
return propertyOrder.PropertyExpression == paginationSequenceConfig.PropertyName;
return false;

View File

@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines
{
internal class AppendOrderSequenceStreamMergeCombine<TEntity>: IStreamMergeCombine<TEntity>
{
public IStreamMergeAsyncEnumerator<TEntity> StreamMergeEnumeratorCombine(StreamMergeContext<TEntity> streamMergeContext,
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (streamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines
{
internal class DefaultStreamMergeCombine<TEntity>:IStreamMergeCombine<TEntity>
{
public IStreamMergeAsyncEnumerator<TEntity> StreamMergeEnumeratorCombine(StreamMergeContext<TEntity> streamMergeContext,
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (streamMergeContext.IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, streamsAsyncEnumerators);
if (streamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines
{
internal class EmptyStreamMergeCombine<TEntity>:IStreamMergeCombine<TEntity>
{
public IStreamMergeAsyncEnumerator<TEntity> StreamMergeEnumeratorCombine(StreamMergeContext<TEntity> streamMergeContext,
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (streamsAsyncEnumerators.Length != 1)
throw new ShardingCoreInvalidOperationException($"empty query combine has more {nameof(IStreamMergeAsyncEnumerator<TEntity>)}");
return streamsAsyncEnumerators[0];
}
}
}

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines
{
internal class ReverseStreamMergeCombine<TEntity>:IStreamMergeCombine<TEntity>
{
public IStreamMergeAsyncEnumerator<TEntity> StreamMergeEnumeratorCombine(StreamMergeContext<TEntity> streamMergeContext,
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
var doGetStreamMergeAsyncEnumerator = DoGetStreamMergeAsyncEnumerator(streamMergeContext, streamsAsyncEnumerators);
return new InMemoryReverseStreamMergeAsyncEnumerator<TEntity>(doGetStreamMergeAsyncEnumerator);
}
private static IStreamMergeAsyncEnumerator<TEntity> DoGetStreamMergeAsyncEnumerator(StreamMergeContext<TEntity> streamMergeContext, IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (streamMergeContext.IsPaginationQuery())
{
if (streamMergeContext.HasGroupQuery())
{
var multiAggregateOrderStreamMergeAsyncEnumerator = new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, streamsAsyncEnumerators);
return new PaginationStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, new[] { multiAggregateOrderStreamMergeAsyncEnumerator });
}
return new PaginationStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, streamsAsyncEnumerators);
}
if (streamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines
{
internal class SequenceStreamMergeCombine<TEntity>:IStreamMergeCombine<TEntity>
{
public IStreamMergeAsyncEnumerator<TEntity> StreamMergeEnumeratorCombine(StreamMergeContext<TEntity> streamMergeContext,
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (streamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, streamsAsyncEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(streamMergeContext, streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.StreamMergeCombines
{
internal class SingleStreamMergeCombine<TEntity>:IStreamMergeCombine<TEntity>
{
public IStreamMergeAsyncEnumerator<TEntity> StreamMergeEnumeratorCombine(StreamMergeContext<TEntity> streamMergeContext,
IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (streamsAsyncEnumerators.Length != 1)
throw new ShardingCoreInvalidOperationException($"single query has more {nameof(IStreamMergeAsyncEnumerator<TEntity>)}");
return streamsAsyncEnumerators[0];
}
}
}

View File

@ -7,6 +7,10 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -26,7 +30,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public override async Task<TEntity> DoMergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken);
var notNullResult=result.Where(o => o != null && o.QueryResult != null).Select(o => o.QueryResult).ToList();
var notNullResult=result.Where(o => o.HasQueryResult()).Select(o => o.QueryResult).ToList();
if (notNullResult.IsEmpty())
throw new InvalidOperationException("Sequence contains no elements.");
@ -37,5 +41,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return notNullResult.First();
}
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor)
{
return AnyElementParallelExecuteControl<TResult>.Create(GetStreamMergeContext(),executor);
}
}
}

View File

@ -3,9 +3,11 @@ using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -37,5 +39,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return notNullResult.FirstOrDefault();
}
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor)
{
return AnyElementParallelExecuteControl<TResult>.Create(GetStreamMergeContext(),executor);
}
}
}

View File

@ -7,6 +7,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -36,5 +38,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return notNullResult.Last();
}
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor)
{
return AnyElementParallelExecuteControl<TResult>.Create(GetStreamMergeContext(),executor);
}
protected override bool ExecuteOrderEqualPropertyOrder()
{
return false;
}
}
}

View File

@ -6,6 +6,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -36,5 +38,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return notNullResult.LastOrDefault();
}
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor)
{
return AnyElementParallelExecuteControl<TResult>.Create(GetStreamMergeContext(),executor);
}
protected override bool ExecuteOrderEqualPropertyOrder()
{
return false;
}
}
}

View File

@ -6,6 +6,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -39,5 +41,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return result.Sum(o=>o.QueryResult);
}
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor)
{
return NoTripParallelExecuteControl<TResult>.Create(GetStreamMergeContext(),executor);
}
}
}

View File

@ -0,0 +1,155 @@
using ShardingCore.Core;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal abstract class AbstractParallelExecuteControl<TResult> : IParallelExecuteControl<TResult>
{
private readonly ISeqQueryProvider _seqQueryProvider;
private readonly IParallelExecutor<TResult> _executor;
/// <summary>
/// not cancelled const mark
/// </summary>
private const int notCancelled = 1;
/// <summary>
/// cancelled const mark
/// </summary>
private const int cancelled = 0;
/// <summary>
/// cancel status
/// </summary>
private int cancelStatus= notCancelled;
protected AbstractParallelExecuteControl(ISeqQueryProvider seqQueryProvider,IParallelExecutor<TResult> executor)
{
_seqQueryProvider = seqQueryProvider??throw new ArgumentNullException(nameof(seqQueryProvider));
_executor = executor;
}
protected ISeqQueryProvider GetSeqQueryProvider()
{
return _seqQueryProvider;
}
public abstract ICircuitBreaker CreateCircuitBreaker();
protected void Cancel()
{
Interlocked.Exchange(ref cancelStatus, cancelled);
}
private bool IsCancelled()
{
return cancelStatus == cancelled;
}
public async Task<LinkedList<TResult>> ExecuteAsync(bool async, DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
try
{
return await ExecuteAsync0(async, dataSourceSqlExecutorUnit, cancellationToken);
}
catch
{
Cancel();
throw;
}
}
private async Task<LinkedList<TResult>> ExecuteAsync0(bool async, DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var circuitBreaker = CreateCircuitBreaker();
var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups;
LinkedList<TResult> result = new LinkedList<TResult>();
//同数据库下多组数据间采用串行
foreach (var executorGroup in executorGroups)
{
//同组采用并行最大化用户配置链接数
var routeQueryResults = await ExecuteAsync(executorGroup.Groups, cancellationToken);
//严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext
if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
{
MergeParallelExecuteResult(result, routeQueryResults.Select(o => o.MergeResult), async);
var dbContexts = routeQueryResults.Select(o => o.DbContext);
foreach (var dbContext in dbContexts)
{
#if !EFCORE2
await dbContext.DisposeAsync();
#endif
#if EFCORE2
dbContext.Dispose();
#endif
}
}
else
{
foreach (var routeQueryResult in routeQueryResults)
{
result.AddLast(routeQueryResult.MergeResult);
}
}
if (IsCancelled()|| circuitBreaker.IsTrip(result))
break;
}
return result;
}
/// <summary>
/// 同库同组下面的并行异步执行,需要归并成一个结果
/// </summary>
/// <param name="sqlExecutorUnits"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected async Task<LinkedList<ShardingMergeResult<TResult>>> ExecuteAsync(List<SqlExecutorUnit> sqlExecutorUnits, CancellationToken cancellationToken = new CancellationToken())
{
if (sqlExecutorUnits.Count <= 0)
{
return new LinkedList<ShardingMergeResult<TResult>>();
}
else
{
var result = new LinkedList<ShardingMergeResult<TResult>>();
Task<ShardingMergeResult<TResult>>[] tasks = null;
if (sqlExecutorUnits.Count > 1)
{
tasks = sqlExecutorUnits.Skip(1).Select(sqlExecutorUnit =>
{
return _executor.ExecuteAsync(sqlExecutorUnit, cancellationToken);
}).ToArray();
}
else
{
tasks = Array.Empty<Task<ShardingMergeResult<TResult>>>();
}
var firstResult = await _executor.ExecuteAsync(sqlExecutorUnits[0], cancellationToken);
result.AddLast(firstResult);
var otherResults = await TaskHelper.WhenAllFastFail(tasks);
foreach (var otherResult in otherResults)
{
result.AddLast(otherResult);
}
return result;
}
}
protected virtual void MergeParallelExecuteResult(LinkedList<TResult> previewResults, IEnumerable<TResult> parallelResults, bool async)
{
foreach (var parallelResult in parallelResults)
{
previewResults.AddLast(parallelResult);
}
}
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class AllParallelExecuteControl<TResult> : AbstractParallelExecuteControl<TResult>
{
private AllParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider, executor)
{
}
public static AllParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new AllParallelExecuteControl<TResult>(seqQueryProvider, executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
var allCircuitBreaker = new AllCircuitBreaker(GetSeqQueryProvider());
allCircuitBreaker.Register(() =>
{
Cancel();
});
return allCircuitBreaker;
}
}
}

View File

@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class AnyElementParallelExecuteControl<TResult> : AbstractParallelExecuteControl<TResult>
{
private AnyElementParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider,executor)
{
}
public static AnyElementParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new AnyElementParallelExecuteControl<TResult>(seqQueryProvider,executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
return new AnyElementCircuitBreaker(GetSeqQueryProvider());
}
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class AnyParallelExecuteControl<TResult> : AbstractParallelExecuteControl<TResult>
{
private AnyParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider, executor)
{
}
public static AnyParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new AnyParallelExecuteControl<TResult>(seqQueryProvider, executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
var anyCircuitBreaker = new AnyCircuitBreaker(GetSeqQueryProvider());
anyCircuitBreaker.Register(() =>
{
Cancel();
});
return anyCircuitBreaker;
}
}
}

View File

@ -0,0 +1,59 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
{
internal abstract class AbstractCircuitBreaker: ICircuitBreaker
{
private readonly ISeqQueryProvider _seqQueryProvider;
private const int TRIP = 1;
private const int UNTRIP = 0;
private int _trip = UNTRIP;
private Action _afterTrip;
protected AbstractCircuitBreaker(ISeqQueryProvider seqQueryProvider)
{
_seqQueryProvider = seqQueryProvider;
}
protected ISeqQueryProvider GetSeqQueryProvider()
{
return _seqQueryProvider;
}
public bool IsTrip<TResult>(IEnumerable<TResult> results)
{
if (!_seqQueryProvider.IsSeqQuery())
return false;
if (!_seqQueryProvider.IsParallelExecute())
return false;
if (_trip == TRIP)
return true;
if (ConditionalTrip(results))
{
Trip();
return true;
}
return false;
}
protected abstract bool ConditionalTrip<TResult>(IEnumerable<TResult> results);
public void Trip()
{
_trip = TRIP;
_afterTrip?.Invoke();
}
public void Register(Action afterTrip)
{
_afterTrip = afterTrip;
}
}
}

View File

@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
{
internal class AllCircuitBreaker:AbstractCircuitBreaker
{
public AllCircuitBreaker(ISeqQueryProvider seqQueryProvider) : base(seqQueryProvider)
{
}
protected override bool ConditionalTrip<TResult>(IEnumerable<TResult> results)
{
//只要有一个是false就拉闸
return results.Any(o => o is RouteQueryResult<bool> routeQueryResult && routeQueryResult.QueryResult==false);
}
}
}

View File

@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
{
internal class AnyCircuitBreaker:AbstractCircuitBreaker
{
public AnyCircuitBreaker(ISeqQueryProvider seqQueryProvider) : base(seqQueryProvider)
{
}
protected override bool ConditionalTrip<TResult>(IEnumerable<TResult> results)
{
return results.Any(o => o is RouteQueryResult<bool> routeQueryResult && routeQueryResult.QueryResult);
}
}
}

View File

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
{
internal class AnyElementCircuitBreaker : AbstractCircuitBreaker
{
public AnyElementCircuitBreaker(ISeqQueryProvider seqQueryProvider) : base(seqQueryProvider)
{
}
/// <summary>
/// 只要存在任意一个结果那么就直接停止
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="results"></param>
/// <returns></returns>
protected override bool ConditionalTrip<TResult>(IEnumerable<TResult> results)
{
return results.Any(o=>o is IRouteQueryResult routeQueryResult&& routeQueryResult.HasQueryResult());
}
}
}

View File

@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
{
internal class ContainsCircuitBreaker:AbstractCircuitBreaker
{
public ContainsCircuitBreaker(ISeqQueryProvider seqQueryProvider) : base(seqQueryProvider)
{
}
protected override bool ConditionalTrip<TResult>(IEnumerable<TResult> results)
{
return results.Any(o => o is RouteQueryResult<bool> routeQueryResult && routeQueryResult.QueryResult);
}
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
{
internal class NoTripCircuitBreaker:AbstractCircuitBreaker
{
public NoTripCircuitBreaker(ISeqQueryProvider seqQueryProvider) : base(seqQueryProvider)
{
}
protected override bool ConditionalTrip<TResult>(IEnumerable<TResult> results)
{
return false;
}
}
}

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
{
internal class SingleOrSingleOrDefaultCircuitBreaker : AbstractCircuitBreaker
{
public SingleOrSingleOrDefaultCircuitBreaker(ISeqQueryProvider seqQueryProvider) : base(seqQueryProvider)
{
}
protected override bool ConditionalTrip<TResult>(IEnumerable<TResult> results)
{
return results
.Where(o => o is IRouteQueryResult routeQueryResult && routeQueryResult.HasQueryResult())
.Take(2).Count() > 1;
}
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class ContainsParallelExecuteControl<TResult>:AbstractParallelExecuteControl<TResult>
{
public ContainsParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider, executor)
{
}
public static ContainsParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new ContainsParallelExecuteControl<TResult>(seqQueryProvider, executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
var circuitBreaker = new ContainsCircuitBreaker(GetSeqQueryProvider());
circuitBreaker.Register(() =>
{
Cancel();
});
return circuitBreaker;
}
}
}

View File

@ -0,0 +1,81 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal abstract class AbstractEnumeratorParallelExecuteControl<TResult>:AbstractParallelExecuteControl<IStreamMergeAsyncEnumerator<TResult>>
{
private readonly StreamMergeContext<TResult> _streamMergeContext;
protected AbstractEnumeratorParallelExecuteControl(StreamMergeContext<TResult> streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor) : base(streamMergeContext, executor)
{
_streamMergeContext = streamMergeContext;
}
protected StreamMergeContext<TResult> GetStreamMergeContext()
{
return _streamMergeContext;
}
public override ICircuitBreaker CreateCircuitBreaker()
{
return new NoTripCircuitBreaker(GetSeqQueryProvider());
}
protected override void MergeParallelExecuteResult(LinkedList<IStreamMergeAsyncEnumerator<TResult>> previewResults, IEnumerable<IStreamMergeAsyncEnumerator<TResult>> parallelResults, bool async)
{
var previewResultsCount = previewResults.Count;
if (previewResultsCount > 1)
{
throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} {nameof(previewResults)} has more than one element in container");
}
var parallelCount = parallelResults.Count();
if (parallelCount == 0)
return;
//聚合
if (previewResults is LinkedList<IStreamMergeAsyncEnumerator<TResult>> previewInMemoryStreamEnumeratorResults && parallelResults is IEnumerable<IStreamMergeAsyncEnumerator<TResult>> parallelStreamEnumeratorResults)
{
var mergeAsyncEnumerators = new LinkedList<IStreamMergeAsyncEnumerator<TResult>>();
if (previewResultsCount == 1)
{
mergeAsyncEnumerators.AddLast(previewInMemoryStreamEnumeratorResults.First());
}
foreach (var parallelStreamEnumeratorResult in parallelStreamEnumeratorResults)
{
mergeAsyncEnumerators.AddLast(parallelStreamEnumeratorResult);
}
var combineStreamMergeAsyncEnumerator = CombineInMemoryStreamMergeAsyncEnumerator(mergeAsyncEnumerators.ToArray());
var inMemoryStreamMergeAsyncEnumerator = new InMemoryStreamMergeAsyncEnumerator<TResult>(combineStreamMergeAsyncEnumerator, async);
previewInMemoryStreamEnumeratorResults.Clear();
previewInMemoryStreamEnumeratorResults.AddLast(inMemoryStreamMergeAsyncEnumerator);
//合并
return;
}
throw new ShardingCoreInvalidOperationException($"{typeof(TResult)} is not {typeof(IStreamMergeAsyncEnumerator<TResult>)}");
}
/// <summary>
/// 合并成一个迭代器
/// </summary>
/// <param name="streamsAsyncEnumerators"></param>
/// <returns></returns>
public abstract IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators);
public virtual IStreamMergeAsyncEnumerator<TResult> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return CombineStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class AppendOrderSequenceParallelExecuteControl<TResult>:AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine<TResult> _streamMergeCombine;
public AppendOrderSequenceParallelExecuteControl(StreamMergeContext<TResult> streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor,IStreamMergeCombine<TResult> streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class DefaultEnumeratorParallelExecuteControl<TResult>:AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine<TResult> _streamMergeCombine;
public DefaultEnumeratorParallelExecuteControl(StreamMergeContext<TResult> streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor, IStreamMergeCombine<TResult> streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
public override IStreamMergeAsyncEnumerator<TResult> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
if (GetStreamMergeContext().IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), streamsAsyncEnumerators, 0, GetStreamMergeContext().GetPaginationReWriteTake());//内存聚合分页不可以直接获取skip必须获取skip+take的数目
return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class EmptyQueryEnumeratorParallelExecuteControl<TResult>:AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine<TResult> _streamMergeCombine;
public EmptyQueryEnumeratorParallelExecuteControl(StreamMergeContext<TResult> streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor, IStreamMergeCombine<TResult> streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class ReverseEnumeratorParallelExecuteControl<TResult>: AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine<TResult> _streamMergeCombine;
public ReverseEnumeratorParallelExecuteControl(StreamMergeContext<TResult> streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor, IStreamMergeCombine<TResult> streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
public override IStreamMergeAsyncEnumerator<TResult> CombineInMemoryStreamMergeAsyncEnumerator(
IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
if (GetStreamMergeContext().IsPaginationQuery() && GetStreamMergeContext().HasGroupQuery())
{
var multiAggregateOrderStreamMergeAsyncEnumerator = new MultiAggregateOrderStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), streamsAsyncEnumerators);
return new PaginationStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), new[] { multiAggregateOrderStreamMergeAsyncEnumerator }, 0, GetStreamMergeContext().GetPaginationReWriteTake());
}
if (GetStreamMergeContext().IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TResult>(GetStreamMergeContext(), streamsAsyncEnumerators, 0, GetStreamMergeContext().GetPaginationReWriteTake());
return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class SequenceEnumeratorParallelExecuteControl<TResult> : AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine<TResult> _streamMergeCombine;
public SequenceEnumeratorParallelExecuteControl(StreamMergeContext<TResult> streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor, IStreamMergeCombine<TResult> streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
{
internal class SingleQueryEnumeratorParallelExecuteControl<TResult>: AbstractEnumeratorParallelExecuteControl<TResult>
{
private readonly IStreamMergeCombine<TResult> _streamMergeCombine;
public SingleQueryEnumeratorParallelExecuteControl(StreamMergeContext<TResult> streamMergeContext, IParallelExecutor<IStreamMergeAsyncEnumerator<TResult>> executor, IStreamMergeCombine<TResult> streamMergeCombine) : base(streamMergeContext, executor)
{
_streamMergeCombine = streamMergeCombine;
}
public override IStreamMergeAsyncEnumerator<TResult> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TResult>[] streamsAsyncEnumerators)
{
return _streamMergeCombine.StreamMergeEnumeratorCombine(GetStreamMergeContext(), streamsAsyncEnumerators);
}
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class NoTripParallelExecuteControl<TResult> : AbstractParallelExecuteControl<TResult>
{
private NoTripParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider, executor)
{
}
public static NoTripParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new NoTripParallelExecuteControl<TResult>(seqQueryProvider, executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
var circuitBreaker = new NoTripCircuitBreaker(GetSeqQueryProvider());
circuitBreaker.Register(() =>
{
Cancel();
});
return circuitBreaker;
}
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls
{
internal class SingleOrSingleOrDefaultParallelExecuteControl<TResult> : AbstractParallelExecuteControl<TResult>
{
private SingleOrSingleOrDefaultParallelExecuteControl(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor) : base(seqQueryProvider, executor)
{
}
public static SingleOrSingleOrDefaultParallelExecuteControl<TResult> Create(ISeqQueryProvider seqQueryProvider, IParallelExecutor<TResult> executor)
{
return new SingleOrSingleOrDefaultParallelExecuteControl<TResult>(seqQueryProvider, executor);
}
public override ICircuitBreaker CreateCircuitBreaker()
{
var circuitBreaker = new SingleOrSingleOrDefaultCircuitBreaker(GetSeqQueryProvider());
circuitBreaker.Register(() =>
{
Cancel();
});
return circuitBreaker;
}
}
}

View File

@ -0,0 +1,76 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal abstract class AbstractEnumeratorParallelExecutor<TEntity>:IParallelExecutor<IStreamMergeAsyncEnumerator<TEntity>>
{
public abstract Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(
SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken());
/// <summary>
/// 开启异步线程获取并发迭代器
/// </summary>
/// <param name="queryable"></param>
/// <param name="async"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumerator(IQueryable<TEntity> queryable, bool async,
CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
if (async)
{
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
else
{
var enumerator = GetEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
}
}
/// <summary>
/// 获取异步迭代器
/// </summary>
/// <param name="newQueryable"></param>
/// <returns></returns>
public async Task<IAsyncEnumerator<TEntity>> GetAsyncEnumerator0(IQueryable<TEntity> newQueryable)
{
#if !EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
#endif
#if EFCORE2
var enumator = new EFCore2TryCurrentAsyncEnumerator<TEntity>(newQueryable.AsAsyncEnumerable().GetEnumerator());
await enumator.MoveNext();
return enumator;
#endif
}
/// <summary>
/// 获取同步迭代器
/// </summary>
/// <param name="newQueryable"></param>
/// <returns></returns>
public IEnumerator<TEntity> GetEnumerator0(IQueryable<TEntity> newQueryable)
{
var enumator = newQueryable.AsEnumerable().GetEnumerator();
enumator.MoveNext();
return enumator;
}
}
}

View File

@ -0,0 +1,46 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class AppendOrderSequenceEnumeratorParallelExecutor<TEntity>:AbstractEnumeratorParallelExecutor<TEntity>
{
private readonly StreamMergeContext<TEntity> _streamMergeContext;
private readonly IQueryable<TEntity> _noPaginationQueryable;
private readonly bool _async;
public AppendOrderSequenceEnumeratorParallelExecutor(StreamMergeContext<TEntity> streamMergeContext, bool async)
{
_streamMergeContext = streamMergeContext;
_noPaginationQueryable = streamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake(); ;
_async = async;
}
public override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var connectionMode = _streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var (newQueryable, dbContext) = CreateAsyncExecuteQueryable(((SqlSequenceRouteUnit)sqlExecutorUnit.RouteUnit).SequenceResult, _streamMergeContext.Orders, connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, _async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext,
streamMergeAsyncEnumerator);
}
private (IQueryable<TEntity>, DbContext) CreateAsyncExecuteQueryable(SequenceResult sequenceResult, IEnumerable<PropertyOrder> reSetOrders, ConnectionModeEnum connectionMode)
{
var shardingDbContext = _streamMergeContext.CreateDbContext(sequenceResult.DSName, sequenceResult.TableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)(_noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take).OrderWithExpression(reSetOrders))
.ReplaceDbContextQueryable(shardingDbContext);
return (newQueryable, shardingDbContext);
}
}
}

View File

@ -0,0 +1,44 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class DefaultEnumeratorParallelExecutor<TEntity>:AbstractEnumeratorParallelExecutor<TEntity>
{
private readonly StreamMergeContext<TEntity> _streamMergeContext;
private readonly bool _async;
public DefaultEnumeratorParallelExecutor(StreamMergeContext<TEntity> streamMergeContext,bool async)
{
_streamMergeContext = streamMergeContext;
_async = async;
}
public override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var connectionMode = _streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var (newQueryable, dbContext) = CreateAsyncExecuteQueryable(dataSourceName, routeResult, connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, _async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext, streamMergeAsyncEnumerator);
}
private (IQueryable<TEntity>, DbContext) CreateAsyncExecuteQueryable(string dsname, TableRouteResult tableRouteResult, ConnectionModeEnum connectionMode)
{
var shardingDbContext = _streamMergeContext.CreateDbContext(dsname, tableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)_streamMergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
return (newQueryable, shardingDbContext);
}
}
}

View File

@ -0,0 +1,50 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class InMemoryParallelExecutor<TEntity,TResult>:IParallelExecutor<RouteQueryResult<TResult>>
{
private readonly StreamMergeContext<TEntity> _streamMergeContext;
private readonly Func<IQueryable, Task<TResult>> _efQuery;
public InMemoryParallelExecutor(StreamMergeContext<TEntity> streamMergeContext, Func<IQueryable, Task<TResult>> efQuery)
{
_streamMergeContext = streamMergeContext;
_efQuery = efQuery;
}
public async Task<ShardingMergeResult<RouteQueryResult<TResult>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var connectionMode = _streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var (asyncExecuteQueryable, dbContext) =
CreateAsyncExecuteQueryable(dataSourceName, routeResult, connectionMode);
var queryResult = await _efQuery(asyncExecuteQueryable);
var routeQueryResult = new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
return new ShardingMergeResult<RouteQueryResult<TResult>>(dbContext, routeQueryResult);
}
private (IQueryable queryable, DbContext dbContext) CreateAsyncExecuteQueryable(string dsname, TableRouteResult tableRouteResult, ConnectionModeEnum connectionMode)
{
var shardingDbContext = _streamMergeContext.CreateDbContext(dsname, tableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)_streamMergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
return (newQueryable, shardingDbContext);
}
}
}

View File

@ -0,0 +1,48 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class ReverseEnumeratorParallelExecutor<TEntity>:AbstractEnumeratorParallelExecutor<TEntity>
{
private readonly StreamMergeContext<TEntity> _streamMergeContext;
private readonly IOrderedQueryable<TEntity> _reverseOrderQueryable;
private readonly bool _async;
public ReverseEnumeratorParallelExecutor(StreamMergeContext<TEntity> streamMergeContext,IOrderedQueryable<TEntity> reverseOrderQueryable, bool async)
{
_streamMergeContext = streamMergeContext;
_reverseOrderQueryable = reverseOrderQueryable;
_async = async;
}
public override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var connectionMode = _streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var (newQueryable, dbContext) =
CreateAsyncExecuteQueryable(dataSourceName, _reverseOrderQueryable, routeResult, connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, _async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext,
streamMergeAsyncEnumerator);
}
private (IQueryable<TEntity>, DbContext) CreateAsyncExecuteQueryable(string dsname, IQueryable<TEntity> reverseOrderQueryable, TableRouteResult tableRouteResult, ConnectionModeEnum connectionMode)
{
var shardingDbContext = _streamMergeContext.CreateDbContext(dsname, tableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)reverseOrderQueryable
.ReplaceDbContextQueryable(shardingDbContext);
return (newQueryable, shardingDbContext);
}
}
}

View File

@ -0,0 +1,46 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class SequenceEnumeratorParallelExecutor<TEntity>: AbstractEnumeratorParallelExecutor<TEntity>
{
private readonly StreamMergeContext<TEntity> _streamMergeContext;
private readonly bool _async;
private readonly IQueryable<TEntity> _noPaginationQueryable;
public SequenceEnumeratorParallelExecutor(StreamMergeContext<TEntity> streamMergeContext,bool async)
{
_streamMergeContext = streamMergeContext;
_async = async;
_noPaginationQueryable = streamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake();
}
public override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
var connectionMode = _streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var (newQueryable, dbContext) = CreateAsyncExecuteQueryable(
((SqlSequenceRouteUnit)sqlExecutorUnit.RouteUnit).SequenceResult, connectionMode);
var streamMergeAsyncEnumerator = await AsyncParallelEnumerator(newQueryable, _async, cancellationToken);
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>(dbContext, streamMergeAsyncEnumerator);
}
private (IQueryable<TEntity>, DbContext) CreateAsyncExecuteQueryable( SequenceResult sequenceResult, ConnectionModeEnum connectionMode)
{
var shardingDbContext = _streamMergeContext.CreateDbContext(sequenceResult.DSName, sequenceResult.TableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)(_noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take))
.ReplaceDbContextQueryable(shardingDbContext);
return (newQueryable, shardingDbContext);
}
}
}

View File

@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Common;
namespace ShardingCore.Sharding.MergeEngines.ParallelExecutors
{
internal class SingleQueryEnumeratorParallelExecutor<TEntity>:AbstractEnumeratorParallelExecutor<TEntity>
{
public override Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TEntity>>> ExecuteAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
{
throw new NotImplementedException();
}
}
}

View File

@ -9,7 +9,11 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class RouteQueryResult<TResult>
public interface IRouteQueryResult
{
bool HasQueryResult();
}
public class RouteQueryResult<TResult>: IRouteQueryResult
{
public string DataSourceName { get; }
public TableRouteResult TableRouteResult { get; }
@ -21,5 +25,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines
TableRouteResult = tableRouteResult;
QueryResult = queryResult;
}
public bool HasQueryResult()
{
return QueryResult!= null;
}
}
}

View File

@ -6,6 +6,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -32,5 +34,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return notNullResult.Single();
}
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor)
{
return SingleOrSingleOrDefaultParallelExecuteControl<TResult>.Create(GetStreamMergeContext(), executor);
}
}
}

View File

@ -7,6 +7,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions.ParallelExecutors;
using ShardingCore.Sharding.MergeEngines.ParallelControls;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -36,5 +38,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines
throw new InvalidOperationException("Sequence contains more than one element.");
return notNullResult.SingleOrDefault();
}
protected override IParallelExecuteControl<TResult> CreateParallelExecuteControl<TResult>(IParallelExecutor<TResult> executor)
{
return SingleOrSingleOrDefaultParallelExecuteControl<TResult>.Create(GetStreamMergeContext(), executor);
}
}
}

View File

@ -69,7 +69,7 @@ namespace ShardingCore.Core.Internal.StreamMerge.ReWrite
var reWriteOrders = new List<PropertyOrder>(selectProperties.Count());
foreach (var orderProperty in selectProperties)
{
reWriteOrders.Add(new PropertyOrder(orderProperty.PropertyName,true));
reWriteOrders.Add(new PropertyOrder(orderProperty.PropertyName,true, orderProperty.OwnerType));
}
orders = reWriteOrders;
}

View File

@ -20,6 +20,8 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.NotSupportShardingProviders;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Sharding.EntityQueryConfigurations;
namespace ShardingCore.Sharding
@ -30,7 +32,7 @@ namespace ShardingCore.Sharding
* @Date: Monday, 25 January 2021 11:38:27
* @Email: 326308290@qq.com
*/
public class StreamMergeContext<TEntity> : IDisposable
public class StreamMergeContext<TEntity> : ISeqQueryProvider,IDisposable
#if !EFCORE2
, IAsyncDisposable
#endif
@ -56,7 +58,7 @@ namespace ShardingCore.Sharding
public SelectContext SelectContext { get; }
public GroupByContext GroupByContext { get; }
public IEnumerable<TableRouteResult> TableRouteResults { get; }
public TableRouteResult[] TableRouteResults { get; }
public DataSourceRouteResult DataSourceRouteResult { get; }
/// <summary>
/// 本次查询涉及的对象
@ -75,6 +77,10 @@ namespace ShardingCore.Sharding
private readonly IShardingEntityConfigOptions _shardingEntityConfigOptions;
private readonly ConcurrentDictionary<DbContext, object> _parallelDbContexts;
public EntitySeqQueryConfig EntitySeqQueryConfig { get; }
public bool? PrimaryOrderAsc { get; }
private int _maxParallelExecuteCount;
public StreamMergeContext(IMergeQueryCompilerContext mergeQueryCompilerContext,
@ -104,6 +110,34 @@ namespace ShardingCore.Sharding
_shardingEntityConfigOptions = ShardingContainer.GetRequiredShardingEntityConfigOption(mergeQueryCompilerContext.GetShardingDbContextType());
_notSupportShardingProvider = ShardingContainer.GetService<INotSupportShardingProvider>() ?? _defaultNotSupportShardingProvider;
_parallelDbContexts = new ConcurrentDictionary<DbContext, object>();
_maxParallelExecuteCount = _shardingDbContext.GetVirtualDataSource().ConfigurationParams.MaxQueryConnectionsLimit;
if (IsSingleShardingEntityQuery() && !Skip.HasValue&&IsCrossTable &&!IsUnSupportSharding())
{
var propertyOrders = Orders as PropertyOrder[] ?? Orders.ToArray();
if (propertyOrders.IsNotEmpty())
{
var singleShardingEntityType = GetSingleShardingEntityType();
var primaryOrder = propertyOrders[0];
//不是多级不能是匿名对象
if (primaryOrder.OwnerType == singleShardingEntityType&& !primaryOrder.PropertyExpression.Contains("."))
{
var virtualTableManager = (IVirtualTableManager)ShardingContainer.GetService(typeof(IVirtualTableManager<>).GetGenericType0(MergeQueryCompilerContext.GetShardingDbContextType()));
var virtualTable = virtualTableManager.GetVirtualTable(singleShardingEntityType);
if (virtualTable.EnableEntityQuery && virtualTable.EntityQueryMetadata.TryGetSeqQueryConfig(primaryOrder.PropertyExpression, out var seqQueryConfig))
{
EntitySeqQueryConfig = seqQueryConfig;
PrimaryOrderAsc = primaryOrder.IsAsc;
if (!MergeQueryCompilerContext.IsEnumerableQuery())
{
_maxParallelExecuteCount = Math.Min(seqQueryConfig.ParallelThreadQueryCount,
_maxParallelExecuteCount);
}
}
}
}
}
}
public void ReSetOrders(IEnumerable<PropertyOrder> orders)
{
@ -191,6 +225,15 @@ namespace ShardingCore.Sharding
{
return IsCrossDataSource || IsCrossTable;
}
public bool IsSingleShardingEntityQuery()
{
return QueryEntities.Count(o=>MergeQueryCompilerContext.GetEntityMetadataManager().IsSharding(o)) == 1;
}
public Type GetSingleShardingEntityType()
{
return QueryEntities.FirstOrDefault(o => MergeQueryCompilerContext.GetEntityMetadataManager().IsSharding(o));
}
//public bool HasAggregateQuery()
//{
// return this.SelectContext.HasAverage();
@ -203,7 +246,7 @@ namespace ShardingCore.Sharding
public int GetMaxQueryConnectionsLimit()
{
return _shardingDbContext.GetVirtualDataSource().ConfigurationParams.MaxQueryConnectionsLimit;
return _maxParallelExecuteCount;
}
public ConnectionModeEnum GetConnectionMode(int sqlCount)
{
@ -335,5 +378,14 @@ namespace ShardingCore.Sharding
}
}
#endif
public bool IsSeqQuery()
{
return EntitySeqQueryConfig != null;
}
public bool IsParallelExecute()
{
return TableRouteResults.Length > GetMaxQueryConnectionsLimit();
}
}
}

View File

@ -10,14 +10,17 @@ namespace ShardingCore.Core.Internal.Visitors
*/
public class PropertyOrder
{
public PropertyOrder(string propertyExpression, bool isAsc)
public PropertyOrder(string propertyExpression, bool isAsc,Type ownerType)
{
PropertyExpression = propertyExpression;
IsAsc = isAsc;
OwnerType = ownerType;
}
public string PropertyExpression { get; set; }
public bool IsAsc { get; set; }
public Type OwnerType { get; }
public override string ToString()
{
return $"{PropertyExpression} {(IsAsc ? "asc" : "desc")}";

View File

@ -93,7 +93,7 @@ namespace ShardingCore.Core.Internal.Visitors
throw new NotSupportedException("sharding order only support property expression");
properties.Reverse();
var propertyExpression = string.Join(".", properties);
_orders.AddFirst(new PropertyOrder(propertyExpression, method.Name == nameof(Queryable.OrderBy) || method.Name == nameof(Queryable.ThenBy)));
_orders.AddFirst(new PropertyOrder(propertyExpression, method.Name == nameof(Queryable.OrderBy) || method.Name == nameof(Queryable.ThenBy), expression.Member.DeclaringType));
}
}

View File

@ -4,7 +4,7 @@
<Version>$(EFCORE6)</Version>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
<DefineConstants>TRACE;DEBUG;EFCORE6;</DefineConstants>
<LangVersion>9.0</LangVersion>
<LangVersion>latest</LangVersion>
<RepositoryUrl>https://github.com/xuejmnet/sharding-core</RepositoryUrl>
<PackageIcon>logo.png</PackageIcon>
</PropertyGroup>

View File

@ -4,7 +4,7 @@
<TargetFramework>net6.0</TargetFramework>
<DefineConstants>TRACE;DEBUG;EFCORE6Test;</DefineConstants>
<AssemblyName>ShardingCore.Test</AssemblyName>
<LangVersion>10.0</LangVersion>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>