[#127]添加enumerator的顺序查询,并且发布x.4.1.9

This commit is contained in:
xuejiaming 2022-03-01 13:36:36 +08:00
parent b07af78224
commit d289b2a849
10 changed files with 134 additions and 12 deletions

View File

@ -1,9 +1,9 @@
:start
::定义版本
set EFCORE2=2.4.1.08
set EFCORE3=3.4.1.08
set EFCORE5=5.4.1.08
set EFCORE6=6.4.1.08
set EFCORE2=2.4.1.09
set EFCORE3=3.4.1.09
set EFCORE5=5.4.1.09
set EFCORE6=6.4.1.09
::删除所有bin与obj下的文件
@echo off

View File

@ -10,9 +10,11 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks;
using ShardingCore;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions.ShardingQueryableExtensions;
@ -45,6 +47,14 @@ namespace Sample.SqlServer.Controllers
//var tableRouteResults = tableRouteRuleEngineFactory.Route(queryable);
var virtualTableManager = ShardingContainer.GetService<IVirtualTableManager<DefaultShardingDbContext>>();
var virtualTable = virtualTableManager.GetVirtualTable<SysUserMod>();
var physicTable1s = virtualTable.RouteTo(new ShardingTableRouteConfig(shardingKeyValue:"123"));//获取值为123的所有分片
Expression<Func<SysUserMod, bool>> where = o => o.Id == "123";
var physicTable2s = virtualTable.RouteTo(new ShardingTableRouteConfig(predicate: where));//获取表达式o.Id == "123"的所有路由
var allPhysicTables = virtualTable.GetAllPhysicTables();
var virtualTableRoute = virtualTable.GetVirtualRoute();
var allTails = virtualTableRoute.GetAllTails();
Console.WriteLine("------------------Get2x------------------------");
@ -214,6 +224,20 @@ namespace Sample.SqlServer.Controllers
});
}
[HttpGet]
public async Task<IActionResult> Get2a3()
{
Console.WriteLine("Get2a3-------------");
var sysUserMods = await _defaultTableDbContext.Set<SysUserSalary>().UseConnectionMode(2).Skip(2).Take(2).OrderByDescending(o=>o.DateOfMonth).ToListAsync();
return Ok(sysUserMods);
}
[HttpGet]
public async Task<IActionResult> Get2a4()
{
Console.WriteLine("Get2a4-------------");
var sysUserMods = await _defaultTableDbContext.Set<SysUserSalary>().UseConnectionMode(2).AsNoSequence().Skip(2).Take(2).OrderByDescending(o => o.DateOfMonth).ToListAsync();
return Ok(sysUserMods);
}
[HttpGet]
public IActionResult Get2([FromQuery] int p, [FromQuery] int s)
{
Stopwatch sp = new Stopwatch();

View File

@ -1,9 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft": "Debug",
"Microsoft.Hosting.Lifetime": "Debug"
"Default": "Information",
"Microsoft": "Information",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*"

View File

@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Abstractions
{
internal interface IParseContext
{
int? GetSkip();
int? GetTake();
}
}

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
{
internal interface IInMemoryStreamMergeAsyncEnumerator
{
int GetReallyCount();
}
internal interface IInMemoryStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>, IInMemoryStreamMergeAsyncEnumerator
{
}
}

View File

@ -15,17 +15,23 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal class InMemoryReverseStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
internal class InMemoryReverseStreamMergeAsyncEnumerator<T> : IInMemoryStreamMergeAsyncEnumerator<T>
{
private readonly IStreamMergeAsyncEnumerator<T> _inMemoryStreamMergeAsyncEnumerator;
private bool _first = true;
private IEnumerator<T> _reverseEnumerator;
private int _inMemoryReallyCount;
public InMemoryReverseStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<T> inMemoryStreamMergeAsyncEnumerator)
{
_inMemoryStreamMergeAsyncEnumerator = inMemoryStreamMergeAsyncEnumerator;
}
public int GetReallyCount()
{
return _inMemoryReallyCount;
}
#if !EFCORE2
public async ValueTask DisposeAsync()
{
await _inMemoryStreamMergeAsyncEnumerator.DisposeAsync();
@ -40,6 +46,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
while (await _inMemoryStreamMergeAsyncEnumerator.MoveNextAsync())
{
_reverseCollection.AddFirst(_inMemoryStreamMergeAsyncEnumerator.GetCurrent());
_inMemoryReallyCount++;
}
_reverseEnumerator = _reverseCollection.GetEnumerator();
@ -58,6 +65,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
while (await _inMemoryStreamMergeAsyncEnumerator.MoveNext(cancellationToken))
{
_reverseCollection.AddFirst(_inMemoryStreamMergeAsyncEnumerator.GetCurrent());
_inMemoryReallyCount++;
}
_reverseEnumerator = _reverseCollection.GetEnumerator();

View File

@ -9,11 +9,12 @@ using ShardingCore.Extensions;
namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
{
internal class InMemoryStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
internal class InMemoryStreamMergeAsyncEnumerator<T> : IInMemoryStreamMergeAsyncEnumerator<T>
{
private readonly bool _async;
private readonly IEnumerator<T> _inMemoryEnumerator;
private bool skip;
private int _inMemoryReallyCount;
public InMemoryStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<T> asyncSource, bool async)
{
@ -40,6 +41,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
#endif
{
linkedList.AddLast(streamMergeAsyncEnumerator.GetCurrent());
_inMemoryReallyCount++;
}
return linkedList.GetEnumerator();
@ -55,6 +57,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
#endif
{
linkedList.AddLast(streamMergeAsyncEnumerator.GetCurrent());
_inMemoryReallyCount++;
}
return linkedList.GetEnumerator();
@ -69,7 +72,12 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
}
return false;
}
public int GetReallyCount()
{
return _inMemoryReallyCount;
}
#if !EFCORE2
public ValueTask DisposeAsync()
{
_inMemoryEnumerator.Dispose();

View File

@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
namespace ShardingCore.Sharding.MergeEngines.ParallelControls.CircuitBreakers
{
internal class EnumeratorCircuitBreaker : AbstractCircuitBreaker
{
public EnumeratorCircuitBreaker(ISeqQueryProvider seqQueryProvider) : base(seqQueryProvider)
{
}
protected override bool SeqConditionalTrip<TResult>(IEnumerable<TResult> results)
{
var parseContext = (IParseContext)GetSeqQueryProvider();
var take = parseContext.GetTake();
if (take.HasValue)
{
return (take.Value+ parseContext.GetSkip().GetValueOrDefault()) <= results.Sum(o =>
{
if (o is IInMemoryStreamMergeAsyncEnumerator inMemoryStreamMergeAsyncEnumerator)
{
return inMemoryStreamMergeAsyncEnumerator.GetReallyCount();
}
return 0;
});
}
return false;
}
protected override bool RandomConditionalTrip<TResult>(IEnumerable<TResult> results)
{
return false;
}
}
}

View File

@ -27,7 +27,7 @@ namespace ShardingCore.Sharding.MergeEngines.ParallelControls.Enumerators
}
public override ICircuitBreaker CreateCircuitBreaker()
{
return new NoTripCircuitBreaker(GetSeqQueryProvider());
return new EnumeratorCircuitBreaker(GetSeqQueryProvider());
}
protected override void MergeParallelExecuteResult(LinkedList<IStreamMergeAsyncEnumerator<TResult>> previewResults, IEnumerable<IStreamMergeAsyncEnumerator<TResult>> parallelResults, bool async)

View File

@ -34,7 +34,7 @@ namespace ShardingCore.Sharding
* @Date: Monday, 25 January 2021 11:38:27
* @Email: 326308290@qq.com
*/
public class StreamMergeContext<TEntity> : ISeqQueryProvider, IDisposable,IPrint
public class StreamMergeContext<TEntity> : ISeqQueryProvider, IParseContext, IDisposable,IPrint
#if !EFCORE2
, IAsyncDisposable
#endif
@ -122,7 +122,7 @@ namespace ShardingCore.Sharding
var maxParallelExecuteCount = _shardingDbContext.GetVirtualDataSource().ConfigurationParams.MaxQueryConnectionsLimit;
var connectionMode = _shardingDbContext.GetVirtualDataSource().ConfigurationParams.ConnectionMode;
if (IsSingleShardingEntityQuery() && !Skip.HasValue && IsCrossTable && !IsNotSupportSharding())
if (IsSingleShardingEntityQuery() && IsCrossTable && !IsNotSupportSharding())
{
var singleShardingEntityType = GetSingleShardingEntityType();
var virtualTableManager = (IVirtualTableManager)ShardingContainer.GetService(typeof(IVirtualTableManager<>).GetGenericType0(MergeQueryCompilerContext.GetShardingDbContextType()));
@ -503,5 +503,15 @@ namespace ShardingCore.Sharding
return
$"stream merge context:[max query connections limit:{GetMaxQueryConnectionsLimit()}],[is use read write separation:{IsUseReadWriteSeparation()}],[is parallel query:{IsParallelQuery()}],[is not support sharding:{IsNotSupportSharding()}],[is sequence query:{IsSeqQuery()}],[can trip:{CanTrip()}],[is route not match:{IsRouteNotMatch()}],[throw if query route not match:{ThrowIfQueryRouteNotMatch()}],[is pagination query:{IsPaginationQuery()}],[has group query:{HasGroupQuery()}],[is merge query:{IsMergeQuery()}],[is single sharding entity query:{IsSingleShardingEntityQuery()}]";
}
public int? GetSkip()
{
return Skip;
}
public int? GetTake()
{
return Take;
}
}
}