This commit is contained in:
xuejiaming 2021-08-25 13:05:45 +08:00
parent a15a4f61af
commit 229d5c1d3d
9 changed files with 73 additions and 33 deletions

View File

@ -1,8 +1,8 @@
:start
::定义版本
set EFCORE2=2.2.0.11-pre
set EFCORE3=3.2.0.11-pre
set EFCORE5=5.2.0.11-pre
set EFCORE2=2.2.0.12
set EFCORE3=3.2.0.12
set EFCORE5=5.2.0.12
::删除所有bin与obj下的文件
@echo off

View File

@ -11,6 +11,10 @@ using ShardingCore.Extensions;
namespace Sample.SqlServer.Controllers
{
public class STU
{
public string Id { get; set; }
}
[ApiController]
[Route("[controller]/[action]")]
public class ValuesController : ControllerBase
@ -26,12 +30,12 @@ namespace Sample.SqlServer.Controllers
[HttpGet]
public async Task<IActionResult> Get()
{
var resultx11231 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age == 198198).Select(o=>o.Id).ContainsAsync("1981");
var resultx1121 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").SumAsync(o=>o.Age);
var resultx11231 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age == 198198).Select(o => o.Id).ContainsAsync("1981");
var resultx1121 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").SumAsync(o => o.Age);
var resultx111 = await _defaultTableDbContext.Set<SysUserMod>().FirstOrDefaultAsync(o => o.Id == "198");
var resultx2 = await _defaultTableDbContext.Set<SysUserMod>().CountAsync(o => o.Age<=10);
var resultx2 = await _defaultTableDbContext.Set<SysUserMod>().CountAsync(o => o.Age <= 10);
var resultx = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").FirstOrDefaultAsync();
var resultx33 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o=>o.Id).FirstOrDefaultAsync();
var resultx33 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o => o.Id).FirstOrDefaultAsync();
var resulxxt = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").ToListAsync();
var result = await _defaultTableDbContext.Set<SysUserMod>().ToListAsync();
@ -50,9 +54,13 @@ namespace Sample.SqlServer.Controllers
var sysUserMod98 = result.FirstOrDefault(o => o.Id == "98");
_defaultTableDbContext.Attach(sysUserMod98);
sysUserMod98.Name = "name_update"+new Random().Next(1,99)+"_98";
sysUserMod98.Name = "name_update" + new Random().Next(1, 99) + "_98";
await _defaultTableDbContext.SaveChangesAsync();
return Ok(result);
var stu = new STU() { Id = "198"};
var sresultx111x = _defaultTableDbContext.Set<SysUserMod>().FirstOrDefault(o => o.Id == stu.Id);
var pageResult = await _defaultTableDbContext.Set<SysUserMod>().Skip(10).Take(10).OrderBy(o => o.Age).ToListAsync();
return Ok(sresultx111);
}
}
}

View File

@ -64,7 +64,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
if (_take.HasValue)
{
realTake++;
if (realTake >= _take.Value)
if (realTake > _take.Value)
return false;
}
}

View File

@ -49,7 +49,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeSync
if (_take.HasValue)
{
realTake++;
if (realTake >= _take.Value)
if (realTake > _take.Value)
return false;
}
}

View File

@ -30,18 +30,18 @@ namespace ShardingCore.Core.Internal.StreamMerge.ReWrite
//去除分页,获取前Take+Skip数量
var reWriteQueryable = _queryable;
if (take.HasValue)
{
reWriteQueryable = _queryable.RemoveTake();
}
if (skip.HasValue)
{
reWriteQueryable = _queryable.RemoveSkip();
}
if (take.HasValue&& skip.GetValueOrDefault()>0)
{
reWriteQueryable = _queryable.RemoveTake();
}
if (take.HasValue)
reWriteQueryable = reWriteQueryable.Take(take.Value + skip.GetValueOrDefault());
//if (take.HasValue)
// reWriteQueryable = reWriteQueryable.Take(take.Value + skip.GetValueOrDefault());
//包含group by
if (extraEntry.GroupByContext.GroupExpression != null)
{

View File

@ -68,11 +68,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines
}
#endif
private IQueryable<T> CreateAsyncExecuteQueryable(RouteResult routeResult)
private IQueryable<T> CreateAsyncExecuteQueryable(RouteResult routeResult,int routeCount)
{
var shardingDbContext = _mergeContext.CreateDbContext(routeResult);
var useOriginal = routeCount>1;
_parllelDbbContexts.Add(shardingDbContext);
var newQueryable = (IQueryable<T>) _mergeContext.GetReWriteQueryable()
var newQueryable = (IQueryable<T>)(useOriginal?_mergeContext.GetReWriteQueryable():_mergeContext.GetOriginalQueryable())
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
}
@ -80,13 +81,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines
private IAsyncEnumerator<T> GetShardingEnumerator()
{
var tableResult = _mergeContext.GetRouteResults();
var routeCount = tableResult.Count();
var enumeratorTasks = tableResult.Select(routeResult =>
{
return Task.Run(async () =>
{
try
{
var newQueryable = CreateAsyncExecuteQueryable(routeResult);
var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount);
var asyncEnumerator = await GetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<T>(asyncEnumerator);
@ -100,7 +102,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
if (_mergeContext.HasSkipTake())
if (routeCount>1&&_mergeContext.HasSkipTake())
return new PaginationStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
if (_mergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
@ -118,13 +120,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public IEnumerator<T> GetEnumerator()
{
var tableResult = _mergeContext.GetRouteResults();
var routeCount = tableResult.Count();
var enumeratorTasks = tableResult.Select(routeResult =>
{
return Task.Run(() =>
{
try
{
var newQueryable = CreateAsyncExecuteQueryable(routeResult);
var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount);
var enumerator = GetEnumerator(newQueryable);
return new StreamMergeEnumerator<T>(enumerator);
@ -138,7 +141,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
if (_mergeContext.HasSkipTake())
if (routeCount > 1 && _mergeContext.HasSkipTake())
return new PaginationStreamMergeEnumerator<T>(_mergeContext, streamEnumerators);
if (_mergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeEnumerator<T>(_mergeContext, streamEnumerators);

View File

@ -99,6 +99,17 @@ namespace ShardingCore.Test50
Assert.Equal(descAge, sysUserMod.Age);
descAge--;
}
var pageResult = await _virtualDbContext.Set<SysUserMod>().Skip(10).Take(10).OrderByDescending(o => o.Age).ToListAsync();
Assert.Equal(10, pageResult.Count);
int pageDescAge = 990;
foreach (var sysUserMod in pageResult)
{
Assert.Equal(pageDescAge, sysUserMod.Age);
pageDescAge--;
}
}
[Fact]

View File

@ -96,6 +96,15 @@ namespace ShardingCore.Test50_2x
Assert.Equal(descAge, sysUserMod.Age);
descAge--;
}
var pageResult = await _virtualDbContext.Set<SysUserMod>().Skip(10).Take(10).OrderByDescending(o => o.Age).ToListAsync();
Assert.Equal(10, pageResult.Count);
int pageDescAge = 990;
foreach (var sysUserMod in pageResult)
{
Assert.Equal(pageDescAge, sysUserMod.Age);
pageDescAge--;
}
}
[Fact]

View File

@ -96,6 +96,15 @@ namespace ShardingCore.Test50_3x
Assert.Equal(descAge, sysUserMod.Age);
descAge--;
}
var pageResult = await _virtualDbContext.Set<SysUserMod>().Skip(10).Take(10).OrderByDescending(o => o.Age).ToListAsync();
Assert.Equal(10, pageResult.Count);
int pageDescAge = 990;
foreach (var sysUserMod in pageResult)
{
Assert.Equal(pageDescAge, sysUserMod.Age);
pageDescAge--;
}
}
[Fact]