优化代码添加Partition分区扩展方法

This commit is contained in:
xuejiaming 2022-02-16 08:46:24 +08:00
parent f6a44e8169
commit 35508d5b1b
3 changed files with 65 additions and 78 deletions

View File

@ -51,6 +51,18 @@ namespace ShardingCore.Extensions
{
return values.Contains(thisValue);
}
/// <summary>
/// 按size分区,每个区size个数目
/// </summary>
/// <typeparam name="TSource"></typeparam>
/// <param name="elements"></param>
/// <param name="size"></param>
/// <returns></returns>
public static IEnumerable<List<TSource>> Partition<TSource>(this IEnumerable<TSource> elements,int size)
{
return elements.Select((o, i) => new { Element = o, Index = i / size })
.GroupBy(o => o.Index).Select(o => o.Select(g => g.Element).ToList());
}
}
}

View File

@ -8,8 +8,15 @@ namespace ShardingCore.Sharding.Abstractions
{
public interface ISeqQueryProvider
{
/// <summary>
/// 是否是顺序查询
/// </summary>
/// <returns></returns>
bool IsSeqQuery();
/// <summary>
/// 是否可以终端:本次查询n张表,链接数限制m,当n>m时则会出现串行查询才需要中断
/// </summary>
/// <returns></returns>
bool CanTrip();
}
}

View File

@ -10,6 +10,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Extensions;
namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
@ -60,39 +61,6 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
return await parallelExecuteControl.ExecuteAsync(async, dataSourceSqlExecutorUnit, cancellationToken);
}
// var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups;
// LinkedList<TResult> result = new LinkedList<TResult>();
// //同数据库下多组数据间采用串行
// foreach (var executorGroup in executorGroups)
// {
// //同组采用并行最大化用户配置链接数
// var routeQueryResults = await ExecuteAsync<TResult>(executorGroup.Groups, sqlExecutorUnitExecuteAsync, cancellationToken);
// //严格限制连接数就在内存中进行聚合并且直接回收掉当前dbcontext
// if (dataSourceSqlExecutorUnit.ConnectionMode == ConnectionModeEnum.CONNECTION_STRICTLY)
// {
// MergeParallelExecuteResult(result, routeQueryResults.Select(o => o.MergeResult), async);
// var dbContexts = routeQueryResults.Select(o => o.DbContext);
// foreach (var dbContext in dbContexts)
// {
//#if !EFCORE2
// await dbContext.DisposeAsync();
//#endif
//#if EFCORE2
// dbContext.Dispose();
//#endif
// }
// }
// else
// {
// foreach (var routeQueryResult in routeQueryResults)
// {
// result.AddLast(routeQueryResult.MergeResult);
// }
// }
// }
// return result;
}, cancellationToken);
}).ToArray();
return waitTaskQueue;
@ -153,53 +121,53 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
// : sqlCount / maxQueryConnectionsLimit + 1, 1);
//计算应该使用那种链接模式
ConnectionModeEnum connectionMode = streamMergeContext.GetConnectionMode(sqlCount);
var sqlExecutorUnitPartitions = sqlGroups
.Select((o, i) => new { Obj = o, index = i / maxQueryConnectionsLimit })
.GroupBy(o => o.index)
.Select(o => o.Select(g => new SqlExecutorUnit(connectionMode, g.Obj)).ToList())
.ToList();
//将SqlExecutorUnit进行分区,每个区maxQueryConnectionsLimit个
//[1,2,3,4,5,6,7],maxQueryConnectionsLimit=3,结果就是[[1,2,3],[4,5,6],[7]]
var sqlExecutorUnitPartitions = sqlGroups.Select(o => new SqlExecutorUnit(connectionMode, o)).Partition(maxQueryConnectionsLimit);
var sqlExecutorGroups = sqlExecutorUnitPartitions.Select(o => new SqlExecutorGroup<SqlExecutorUnit>(connectionMode, o)).ToList();
return new DataSourceSqlExecutorUnit(connectionMode, sqlExecutorGroups);
}
/// <summary>
/// 同库同组下面的并行异步执行,需要归并成一个结果
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="sqlExecutorUnits"></param>
/// <param name="sqlExecutorUnitExecuteAsync"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected async Task<LinkedList<ShardingMergeResult<TResult>>> ExecuteAsync<TResult>(List<SqlExecutorUnit> sqlExecutorUnits, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
{
if (sqlExecutorUnits.Count <= 0)
{
return new LinkedList<ShardingMergeResult<TResult>>();
}
else
{
var result = new LinkedList<ShardingMergeResult<TResult>>();
Task<ShardingMergeResult<TResult>>[] tasks = null;
if (sqlExecutorUnits.Count > 1)
{
tasks = sqlExecutorUnits.Skip(1).Select(sqlExecutorUnit =>
{
return Task.Run(async () => await sqlExecutorUnitExecuteAsync(sqlExecutorUnit), cancellationToken);
}).ToArray();
}
else
{
tasks = Array.Empty<Task<ShardingMergeResult<TResult>>>();
}
var firstResult = await sqlExecutorUnitExecuteAsync(sqlExecutorUnits[0]);
result.AddLast(firstResult);
var otherResults = await TaskHelper.WhenAllFastFail(tasks);
foreach (var otherResult in otherResults)
{
result.AddLast(otherResult);
}
///// <summary>
///// 同库同组下面的并行异步执行,需要归并成一个结果
///// </summary>
///// <typeparam name="TResult"></typeparam>
///// <param name="sqlExecutorUnits"></param>
///// <param name="sqlExecutorUnitExecuteAsync"></param>
///// <param name="cancellationToken"></param>
///// <returns></returns>
//protected async Task<LinkedList<ShardingMergeResult<TResult>>> ExecuteAsync<TResult>(List<SqlExecutorUnit> sqlExecutorUnits, Func<SqlExecutorUnit, Task<ShardingMergeResult<TResult>>> sqlExecutorUnitExecuteAsync, CancellationToken cancellationToken = new CancellationToken())
//{
// if (sqlExecutorUnits.Count <= 0)
// {
// return new LinkedList<ShardingMergeResult<TResult>>();
// }
// else
// {
// var result = new LinkedList<ShardingMergeResult<TResult>>();
// Task<ShardingMergeResult<TResult>>[] tasks = null;
// if (sqlExecutorUnits.Count > 1)
// {
// tasks = sqlExecutorUnits.Skip(1).Select(sqlExecutorUnit =>
// {
// return Task.Run(async () => await sqlExecutorUnitExecuteAsync(sqlExecutorUnit), cancellationToken);
// }).ToArray();
// }
// else
// {
// tasks = Array.Empty<Task<ShardingMergeResult<TResult>>>();
// }
// var firstResult = await sqlExecutorUnitExecuteAsync(sqlExecutorUnits[0]);
// result.AddLast(firstResult);
// var otherResults = await TaskHelper.WhenAllFastFail(tasks);
// foreach (var otherResult in otherResults)
// {
// result.AddLast(otherResult);
// }
return result;
}
}
// return result;
// }
//}
}
}