迭代器查询也支持链接模式

This commit is contained in:
xuejiaming 2021-12-05 01:15:45 +08:00
parent 055416d3d9
commit 9ec692ed75
16 changed files with 406 additions and 144 deletions

View File

@ -43,27 +43,13 @@ namespace Sample.SqlServer
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection).UseLoggerFactory(efLogger))
.AddDefaultDataSource("A",
"Data Source=localhost;Initial Catalog=ShardingCoreDBA;Integrated Security=True;")
.AddShardingDataSource(sp =>
{
return new Dictionary<string, string>()
{
{ "B", "Data Source=localhost;Initial Catalog=ShardingCoreDBB;Integrated Security=True;" }
};
}).AddShardingDataSourceRoute(op=>{})
"Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;")
.AddShardingTableRoute(o =>
{
o.AddShardingTableRoute<SysUserModVirtualTableRoute>();
o.AddShardingTableRoute<SysUserSalaryVirtualTableRoute>();
o.AddShardingTableRoute<TestYearShardingVirtualTableRoute>();
}).AddReadWriteSeparation(sp =>
{
return new Dictionary<string, IEnumerable<string>>()
{
{"A",new List<string>(){"Data Source=localhost;Initial Catalog=ShardingCoreDBA1;Integrated Security=True;","Data Source=localhost;Initial Catalog=ShardingCoreDBA2;Integrated Security=True;"}},
{"B",new List<string>(){"Data Source=localhost;Initial Catalog=ShardingCoreDBB1;Integrated Security=True;"}}
};
},ReadStrategyEnum.Loop,true,defaultPriority:10,ReadConnStringGetStrategyEnum.LatestFirstTime).End();
}).End();
services.AddHealthChecks().AddDbContextCheck<DefaultShardingDbContext>();
//services.AddShardingDbContext<DefaultShardingDbContext, DefaultTableDbContext>(

View File

@ -8,16 +8,16 @@ namespace ShardingCore.Core
{
/// <summary>
/// 链接模式,可以由用户自行指定,使用内存限制,和连接数限制或者系统自行选择最优
/// 内存限制的意思是最小化内存使用率,就是非一次性获取所有数据然后采用流式聚合
/// 连接限制的意思是最小化连接数使用率,就是单次查询并发连接数为设置的连接数。因为有限制,所以无法一直挂起连接,必须全部获取到内存后进行内存聚合
/// STREAM_MERGE的意思是最小化内存使用率,就是非一次性获取所有数据然后采用流式聚合
/// IN_MEMORY_MERGE的意思是最小化连接并发数,就是单次查询并发连接数为设置的连接数<see cref="IShardingConfigOption.MaxQueryConnectionsLimit"/>。因为有限制,所以无法一直挂起连接,必须全部获取到内存后进行内存聚合
/// 系统自行选择会根据用户的配置采取最小化连接数但是如果遇到分页则会根据分页策略采取内存限制因为skip过大会导致内存爆炸
/// </summary>
public enum ConnectionModeEnum
{
//内存限制使用流式聚合
MEMORY_LIMIT,
//链接限制使用内存聚合
CONNECTION_LIMIT,
//流式聚合 同时会有多个链接
STREAM_MERGE,
//内存聚合 连接数会有限制
IN_MEMORY_MERGE,
//系统自动选择内存还是流式聚合
SYSTEM_AUTO
}

View File

@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
namespace ShardingCore.Extensions
{
internal static class ConnectionModeExtension
{
public static async Task<T> ReleaseConnectionAsync<T>(this Task<T> executeTask, DbContext dbContext,
ConnectionModeEnum connectionMode)
{
try
{
return await executeTask;
}
finally
{
if (connectionMode == ConnectionModeEnum.IN_MEMORY_MERGE)
{
#if !EFCORE2
await dbContext.DisposeAsync();
#endif
#if EFCORE2
dbContext.Dispose();
#endif
}
}
}
}
}

View File

@ -0,0 +1,156 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Extensions;
namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
{
internal class InMemoryStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
{
private readonly IEnumerator<T> _inMemoryEnumerator;
private bool skip;
public InMemoryStreamMergeAsyncEnumerator(IAsyncEnumerator<T> asyncSource)
{
if (_inMemoryEnumerator != null)
throw new ArgumentNullException(nameof(_inMemoryEnumerator));
_inMemoryEnumerator = GetAllRowsAsync(asyncSource).WaitAndUnwrapException();
_inMemoryEnumerator.MoveNext();
skip = true;
}
private async Task<IEnumerator<T>> GetAllRowsAsync(IAsyncEnumerator<T> asyncSource)
{
var linkedList = new LinkedList<T>();
if (asyncSource.Current != null)
{
linkedList.AddLast(asyncSource.Current);
#if !EFCORE2
while (await asyncSource.MoveNextAsync())
#endif
#if EFCORE2
while (await asyncSource.MoveNext())
#endif
{
linkedList.AddLast(asyncSource.Current);
}
}
return linkedList.GetEnumerator();
}
public InMemoryStreamMergeAsyncEnumerator(IEnumerator<T> syncSource)
{
if (_inMemoryEnumerator != null)
throw new ArgumentNullException(nameof(_inMemoryEnumerator));
_inMemoryEnumerator = GetAllRows(syncSource);
_inMemoryEnumerator.MoveNext();
skip = true;
}
private IEnumerator<T> GetAllRows(IEnumerator<T> syncSource)
{
var linkedList = new LinkedList<T>();
if (syncSource.Current != null)
{
linkedList.AddLast(syncSource.Current);
while (syncSource.MoveNext())
{
linkedList.AddLast(syncSource.Current);
}
}
return linkedList.GetEnumerator();
}
public bool SkipFirst()
{
if (skip)
{
skip = false;
return true;
}
return false;
}
#if !EFCORE2
public ValueTask DisposeAsync()
{
_inMemoryEnumerator.Dispose();
return new ValueTask();
}
public ValueTask<bool> MoveNextAsync()
{
if (skip)
{
skip = false;
return new ValueTask<bool>(null != _inMemoryEnumerator.Current);
}
return new ValueTask<bool>(_inMemoryEnumerator.MoveNext());
}
public void Dispose()
{
_inMemoryEnumerator?.Dispose();
}
#endif
public bool MoveNext()
{
if (skip)
{
skip = false;
return null != _inMemoryEnumerator.Current;
}
return _inMemoryEnumerator.MoveNext();
}
public bool HasElement()
{
return null != _inMemoryEnumerator.Current;
}
public void Reset()
{
_inMemoryEnumerator.Reset();
}
object IEnumerator.Current => Current;
public T Current => GetCurrent();
public T ReallyCurrent => GetReallyCurrent();
public T GetCurrent()
{
if (skip)
return default;
return _inMemoryEnumerator.Current;
}
public T GetReallyCurrent()
{
return _inMemoryEnumerator.Current;
}
#if EFCORE2
public void Dispose()
{
_inMemoryEnumerator?.Dispose();
}
public Task<bool> MoveNext(CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
if (skip)
{
skip = false;
return Task.FromResult(null != _inMemoryEnumerator.Current);
}
return Task.FromResult(_inMemoryEnumerator.MoveNext());
}
#endif
}
}

View File

@ -9,6 +9,8 @@ using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core;
using ShardingCore.Sharding.MergeEngines.Common;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
@ -22,14 +24,14 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
internal abstract class AbstractBaseMergeEngine<TEntity>
{
/// <summary>
/// 异步多线程控制并发
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="executeAsync"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected abstract StreamMergeContext<TEntity> GetStreamMergeContext();
///// <summary>
///// 异步多线程控制并发
///// </summary>
///// <typeparam name="TResult"></typeparam>
///// <param name="executeAsync"></param>
///// <param name="cancellationToken"></param>
///// <returns></returns>
//public Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,CancellationToken cancellationToken=new CancellationToken())
//{
// cancellationToken.ThrowIfCancellationRequested();
@ -64,23 +66,91 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
//}
public Task<LinkedList<TResult2>>[] GetDataSourceGroupAndExecutorGroup<TResult,TResult2>(Func<SqlExecutorUnit,Task<TResult2>> sqlExecutorUnitExecuteAsync,CancellationToken cancellationToken=new CancellationToken())
{
var streamMergeContext = GetStreamMergeContext();
var maxQueryConnectionsLimit = streamMergeContext.GetMaxQueryConnectionsLimit();
var waitTaskQueue = streamMergeContext.DataSourceRouteResult.IntersectDataSources.SelectMany(
dataSourceName =>
{
return streamMergeContext.TableRouteResults.Select(routeResult =>
new SqlRouteUnit(dataSourceName, routeResult));
}).GroupBy(o => o.DataSourceName).Select(sqlGroups =>
{
var sqlCount = sqlGroups.Count();
//根据用户配置单次查询期望并发数
int exceptCount =
Math.Max(
0 == sqlCount % maxQueryConnectionsLimit
? sqlCount / maxQueryConnectionsLimit
: sqlCount / maxQueryConnectionsLimit + 1, 1);
//计算应该使用那种链接模式
ConnectionModeEnum connectionMode = CalcConnectionMode(streamMergeContext.GetConnectionMode(),
streamMergeContext.GetUseMemoryLimitWhileSkip(), maxQueryConnectionsLimit, sqlCount,
streamMergeContext.Skip);
var sqlExecutorUnitPartitions = sqlGroups
.Select((o, i) => new { Obj = o, index = i % exceptCount }).GroupBy(o => o.index)
.Select(o => o.Select(g => new SqlExecutorUnit(connectionMode, g.Obj)).ToList()).ToList();
return sqlExecutorUnitPartitions.Select(o => new SqlExecutorGroup<SqlExecutorUnit>(o)).ToList();
}).Select(executorGroups =>
{
return Task.Run(async () =>
{
LinkedList<TResult2> result = new LinkedList<TResult2>();
foreach (var executorGroup in executorGroups)
{
var executorGroupParallelExecuteTasks = executorGroup.Groups.Select(executor =>
{
return Task.Run(async () =>
{
return await sqlExecutorUnitExecuteAsync(executor);
//var dataSourceName = executor.RouteUnit.DataSourceName;
//var routeResult = executor.RouteUnit.TableRouteResult;
//var asyncExecuteQueryable =
// CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult);
//var queryResult = await efQuery(asyncExecuteQueryable);
//return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
//return await AsyncParallelResultExecute(asyncExecuteQueryable, dataSourceName,
// routeResult, efQuery,
// cancellationToken);
}, cancellationToken);
}).ToArray();
var routeQueryResults = (await Task.WhenAll(executorGroupParallelExecuteTasks)).ToList();
foreach (var routeQueryResult in routeQueryResults)
{
result.AddLast(routeQueryResult);
}
}
return result;
}, cancellationToken);
}).ToArray();
return waitTaskQueue;
}
protected ConnectionModeEnum CalcConnectionMode(ConnectionModeEnum currentConnectionMode, int useMemoryLimitWhileSkip, int maxQueryConnectionsLimit, int sqlCount,int? skip)
{
switch (currentConnectionMode)
{
case ConnectionModeEnum.MEMORY_LIMIT:
case ConnectionModeEnum.CONNECTION_LIMIT: return currentConnectionMode;
case ConnectionModeEnum.STREAM_MERGE:
case ConnectionModeEnum.IN_MEMORY_MERGE: return currentConnectionMode;
default:
{
if (skip.GetValueOrDefault() > useMemoryLimitWhileSkip)
if (skip.HasValue && skip.Value > useMemoryLimitWhileSkip)
{
return ConnectionModeEnum.MEMORY_LIMIT;
return ConnectionModeEnum.STREAM_MERGE;
}
return maxQueryConnectionsLimit < sqlCount
? ConnectionModeEnum.CONNECTION_LIMIT
: ConnectionModeEnum.MEMORY_LIMIT; ;
? ConnectionModeEnum.IN_MEMORY_MERGE
: ConnectionModeEnum.STREAM_MERGE; ;
}
}
}

View File

@ -5,6 +5,7 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
@ -23,7 +24,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal abstract class AbstractInMemoryAsyncMergeEngine<TEntity> : AbstractBaseMergeEngine<TEntity>,IInMemoryAsyncMergeEngine<TEntity>
internal abstract class AbstractInMemoryAsyncMergeEngine<TEntity> : AbstractBaseMergeEngine<TEntity>, IInMemoryAsyncMergeEngine<TEntity>
{
private readonly MethodCallExpression _methodCallExpression;
private readonly StreamMergeContext<TEntity> _mergeContext;
@ -72,80 +73,33 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
/// <returns></returns>
protected abstract IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression);
private IQueryable CreateAsyncExecuteQueryable<TResult>(string dsname, TableRouteResult tableRouteResult)
private (IQueryable queryable, DbContext dbContext) CreateAsyncExecuteQueryable<TResult>(string dsname, TableRouteResult tableRouteResult, ConnectionModeEnum connectionMode)
{
var shardingDbContext = _mergeContext.CreateDbContext(dsname, tableRouteResult);
var shardingDbContext = _mergeContext.CreateDbContext(dsname, tableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)GetStreamMergeContext().GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var newCombineQueryable = DoCombineQueryable<TResult>(newQueryable);
return newCombineQueryable
;
return (newCombineQueryable, shardingDbContext);
;
}
public async Task<List<RouteQueryResult<TResult>>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken())
{
var dataSourceRouteResult = _mergeContext.DataSourceRouteResult;
var maxQueryConnectionsLimit = _mergeContext.GetMaxQueryConnectionsLimit();
var waitExecuteQueue = dataSourceRouteResult.IntersectDataSources.SelectMany(dataSourceName =>
{
return _mergeContext.TableRouteResults.Select(routeResult =>new SqlRouteUnit(dataSourceName,routeResult));
}).GroupBy(o=>o.DataSourceName).Select(sqlGroups =>
{
var sqlCount = sqlGroups.Count();
//根据用户配置单次查询期望并发数
int exceptCount =
Math.Max(
0 == sqlCount % maxQueryConnectionsLimit
? sqlCount / maxQueryConnectionsLimit
: sqlCount / maxQueryConnectionsLimit + 1, 1);
//计算应该使用那种链接模式
ConnectionModeEnum connectionMode = CalcConnectionMode(_mergeContext.GetConnectionMode(),
_mergeContext.GetUseMemoryLimitWhileSkip(), maxQueryConnectionsLimit, sqlCount, 0);
var sqlExecutorUnitPartitions = sqlGroups
.Select((o, i) => new { Obj = o, index = i % exceptCount }).GroupBy(o => o.index)
.Select(o => o.Select(g => new SqlExecutorUnit(connectionMode,g.Obj)).ToList()).ToList();
return sqlExecutorUnitPartitions.Select(o => new SqlExecutorGroup<SqlExecutorUnit>(o)).ToList();
})
.Select(executorGroups =>
{
return Task.Run(async() =>
var waitExecuteQueue = GetDataSourceGroupAndExecutorGroup<TResult, RouteQueryResult<TResult>>(
async sqlExecutorUnit =>
{
LinkedList<RouteQueryResult<TResult>> result = new LinkedList<RouteQueryResult<TResult>>();
foreach (var executorGroup in executorGroups)
{
var executorGroupParallelExecuteTasks = executorGroup.Groups.Select(executor =>
{
return Task.Run(async () =>
{
var dataSourceName = executor.RouteUnit.DataSourceName;
var routeResult = executor.RouteUnit.TableRouteResult;
var connectionMode = _mergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var asyncExecuteQueryable =
CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult);
var (asyncExecuteQueryable, dbContext) =
CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult, connectionMode);
var queryResult = await efQuery(asyncExecuteQueryable).ReleaseConnectionAsync(dbContext, connectionMode);
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
}).ToArray();
var queryResult = await efQuery(asyncExecuteQueryable);
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
//return await AsyncParallelResultExecute(asyncExecuteQueryable, dataSourceName,
// routeResult, efQuery,
// cancellationToken);
},cancellationToken);
}).ToArray();
var routeQueryResults = (await Task.WhenAll(executorGroupParallelExecuteTasks)).ToList();
foreach (var routeQueryResult in routeQueryResults)
{
result.AddLast(routeQueryResult);
}
}
return result;
},cancellationToken);
}).ToArray();
return (await Task.WhenAll(waitExecuteQueue)).SelectMany(o=>o).ToList();
return (await Task.WhenAll(waitExecuteQueue)).SelectMany(o => o).ToList();
}
@ -173,7 +127,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
return queryable;
}
public StreamMergeContext<TEntity> GetStreamMergeContext()
protected override StreamMergeContext<TEntity> GetStreamMergeContext()
{
return _mergeContext;
}

View File

@ -16,7 +16,6 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
*/
internal interface IInMemoryAsyncMergeEngine<TEntity>
{
StreamMergeContext<TEntity> GetStreamMergeContext();
Task<List<RouteQueryResult<TResult>>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery,
CancellationToken cancellationToken = new CancellationToken());
}

View File

@ -5,9 +5,11 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
@ -26,6 +28,10 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
{
public StreamMergeContext<TEntity> StreamMergeContext { get; }
protected override StreamMergeContext<TEntity> GetStreamMergeContext()
{
return StreamMergeContext;
}
public AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
{
@ -98,21 +104,22 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
/// </summary>
/// <param name="queryable"></param>
/// <param name="async"></param>
/// <param name="connectionMode"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<StreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumerator(IQueryable<TEntity> queryable, bool async,
public async Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumerator(IQueryable<TEntity> queryable, bool async,ConnectionModeEnum connectionMode,
CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
if (async)
{
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
return connectionMode==ConnectionModeEnum.STREAM_MERGE? new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator):new InMemoryStreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
else
{
var enumerator = GetEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
return connectionMode == ConnectionModeEnum.STREAM_MERGE ? new StreamMergeAsyncEnumerator<TEntity>(enumerator):new InMemoryStreamMergeAsyncEnumerator<TEntity>(enumerator);
}
}

View File

@ -9,6 +9,7 @@ namespace ShardingCore.Sharding.MergeEngines.Common
{
public class SqlExecutorUnit
{
public SqlExecutorUnit(ConnectionModeEnum connectionMode, SqlRouteUnit routeUnit)
{
ConnectionMode = connectionMode;

View File

@ -0,0 +1,37 @@
//using System;
//using System.Collections.Generic;
//using System.Diagnostics.CodeAnalysis;
//using System.Linq;
//using System.Text;
//using System.Threading.Tasks;
//using Microsoft.EntityFrameworkCore;
//using ShardingCore.Core;
//namespace ShardingCore.Sharding.MergeEngines
//{
// internal class ConnectionModeTemplate
// {
// [ExcludeFromCodeCoverage]
// private ConnectionModeTemplate() { }
// public static async Task<T> ExecuteAsync<T>(Func<Task<T>> func, DbContext dbContext, ConnectionModeEnum connectionMode)
// {
// try
// {
// return await func();
// }
// finally
// {
// if (connectionMode == ConnectionModeEnum.IN_MEMORY_MERGE)
// {
//#if !EFCORE
// await dbContext.DisposeAsync();
//#endif
//#if EFCORE2
// dbContext.Dispose();
//#endif
// }
// }
// }
// }
//}

View File

@ -4,6 +4,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
@ -107,7 +108,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
return Task.Run(async () =>
{
var newQueryable = CreateAsyncExecuteQueryable(sequenceResult.DSName, noPaginationQueryable, sequenceResult, reSetOrders);
return await AsyncParallelEnumerator(newQueryable, async, cancellationToken);
return await AsyncParallelEnumerator(newQueryable, async, ConnectionModeEnum.STREAM_MERGE, cancellationToken);
});
}).ToArray();
@ -117,7 +118,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname, IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult, IEnumerable<PropertyOrder> reSetOrders)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname, sequenceResult.TableRouteResult);
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname, sequenceResult.TableRouteResult, ConnectionModeEnum.STREAM_MERGE);
var newQueryable = (IQueryable<TEntity>)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take).OrderWithExpression(reSetOrders))
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;

View File

@ -2,6 +2,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
@ -28,31 +29,27 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var dataSourceRouteResult = StreamMergeContext.DataSourceRouteResult;
var enumeratorTasks = dataSourceRouteResult.IntersectDataSources.SelectMany(dataSourceName =>
{
var tableRouteResults = StreamMergeContext.TableRouteResults;
return tableRouteResults.Select(routeResult =>
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<TEntity, IStreamMergeAsyncEnumerator<TEntity>>(
async sqlExecutorUnit =>
{
return Task.Run(async () =>
{
var newQueryable = CreateAsyncExecuteQueryable(dataSourceName, routeResult);
return await AsyncParallelEnumerator(newQueryable, async, cancellationToken);
},cancellationToken);
});
var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var (newQueryable,dbContext) = CreateAsyncExecuteQueryable(dataSourceName, routeResult, connectionMode);
return await AsyncParallelEnumerator(newQueryable, async,connectionMode, cancellationToken)
.ReleaseConnectionAsync(dbContext, connectionMode);
}, cancellationToken);
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray();
return streamEnumerators;
}
private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname,TableRouteResult tableRouteResult)
private (IQueryable<TEntity>,DbContext) CreateAsyncExecuteQueryable(string dsname,TableRouteResult tableRouteResult,ConnectionModeEnum connectionMode)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult);
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)StreamMergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
return (newQueryable,shardingDbContext);
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)

View File

@ -4,6 +4,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
@ -45,30 +46,28 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
StreamMergeContext.ReSetOrders(propertyOrders);
var reverseOrderQueryable = noPaginationNoOrderQueryable.Take((int)realSkip+(int)take).OrderWithExpression(propertyOrders);
var dataSourceRouteResult = StreamMergeContext.DataSourceRouteResult;
var enumeratorTasks = dataSourceRouteResult.IntersectDataSources.SelectMany(dataSourceName =>
{
return StreamMergeContext.TableRouteResults.Select(routeResult =>
var enumeratorTasks = GetDataSourceGroupAndExecutorGroup<TEntity, IStreamMergeAsyncEnumerator<TEntity>>(
async sqlExecutorUnit =>
{
return Task.Run(async () =>
{
var newQueryable =
CreateAsyncExecuteQueryable(dataSourceName, reverseOrderQueryable, routeResult);
return await AsyncParallelEnumerator(newQueryable, async, cancellationToken);
});
var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode);
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
var (newQueryable,dbContext) =
CreateAsyncExecuteQueryable(dataSourceName, reverseOrderQueryable, routeResult, connectionMode);
return await AsyncParallelEnumerator(newQueryable, async, connectionMode, cancellationToken).ReleaseConnectionAsync(dbContext,connectionMode);
});
}).ToArray();;
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException().SelectMany(o=>o).ToArray();
return streamEnumerators;
}
private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname,IQueryable<TEntity> reverseOrderQueryable, TableRouteResult tableRouteResult)
private (IQueryable<TEntity>,DbContext) CreateAsyncExecuteQueryable(string dsname,IQueryable<TEntity> reverseOrderQueryable, TableRouteResult tableRouteResult,ConnectionModeEnum connectionMode)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult);
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname,tableRouteResult, connectionMode);
var newQueryable = (IQueryable<TEntity>)reverseOrderQueryable
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
return (newQueryable,shardingDbContext);
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)

View File

@ -4,6 +4,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
@ -95,7 +96,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
{
var newQueryable =
CreateAsyncExecuteQueryable(sequenceResult.DSName, noPaginationQueryable, sequenceResult);
return await AsyncParallelEnumerator(newQueryable, async, cancellationToken);
return await AsyncParallelEnumerator(newQueryable, async, ConnectionModeEnum.STREAM_MERGE, cancellationToken);
});
}).ToArray();
@ -105,7 +106,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
private IQueryable<TEntity> CreateAsyncExecuteQueryable(string dsname, IQueryable<TEntity> noPaginationQueryable, SequenceResult sequenceResult)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname, sequenceResult.TableRouteResult);
var shardingDbContext = StreamMergeContext.CreateDbContext(dsname, sequenceResult.TableRouteResult, ConnectionModeEnum.STREAM_MERGE);
var newQueryable = (IQueryable<TEntity>)(noPaginationQueryable.Skip(sequenceResult.Skip).Take(sequenceResult.Take))
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;

View File

@ -1,6 +1,7 @@
using System.Linq;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
@ -27,7 +28,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
cancellationToken.ThrowIfCancellationRequested();
var dataSourceName = StreamMergeContext.DataSourceRouteResult.IntersectDataSources.First();
var routeResult = StreamMergeContext.TableRouteResults.First();
var shardingDbContext = StreamMergeContext.CreateDbContext(dataSourceName,routeResult);
var shardingDbContext = StreamMergeContext.CreateDbContext(dataSourceName,routeResult,ConnectionModeEnum.STREAM_MERGE);
var newQueryable = (IQueryable<TEntity>) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext);
if (async)
{
@ -48,5 +49,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
throw new ShardingCoreException($"{nameof(SingleQueryEnumeratorAsyncStreamMergeEngine<TShardingDbContext,TEntity>)} has more {nameof(IStreamMergeAsyncEnumerator<TEntity>)}");
return streamsAsyncEnumerators[0];
}
}
}

View File

@ -133,19 +133,37 @@ namespace ShardingCore.Sharding
/// </summary>
/// <param name="dataSourceName">data source name</param>
/// <param name="tableRouteResult"></param>
/// <param name="connectionMode"></param>
/// <returns></returns>
public DbContext CreateDbContext(string dataSourceName, TableRouteResult tableRouteResult)
public DbContext CreateDbContext(string dataSourceName, TableRouteResult tableRouteResult, ConnectionModeEnum connectionMode)
{
var routeTail = _routeTailFactory.Create(tableRouteResult);
//如果开启了读写分离或者本次查询是跨表或者跨库的表示本次查询的dbcontext是不存储的用完后就直接dispose
var parallelQuery = IsParallelQuery();
var dbContext = _shardingDbContext.GetDbContext(dataSourceName, parallelQuery, routeTail);
if (parallelQuery)
if (parallelQuery && RealConnectionMode(connectionMode) == ConnectionModeEnum.STREAM_MERGE)
{
_parallelDbContexts.TryAdd(dbContext, null);
}
return dbContext;
}
/// <summary>
/// 因为并发查询情况下那么你是内存就是内存你是流式就是流式
/// 如果不是并发查询的情况下系统会将当前dbcontext进行利用起来所以只能是流式
/// </summary>
/// <param name="connectionMode"></param>
/// <returns></returns>
public ConnectionModeEnum RealConnectionMode(ConnectionModeEnum connectionMode)
{
if (IsParallelQuery())
{
return connectionMode;
}
else
{
return ConnectionModeEnum.STREAM_MERGE;
}
}
//public IRouteTail Create(TableRouteResult tableRouteResult)
//{