This commit is contained in:
xuejmnet 2021-09-02 21:19:46 +08:00
parent 2f0a6b3af9
commit 880b45e486
6 changed files with 118 additions and 30 deletions

View File

@ -0,0 +1,30 @@
using System;
using System.Linq;
using ShardingCore.Sharding;
namespace ShardingCore.Extensions
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 02 September 2021 20:46:24
* @Email: 326308290@qq.com
*/
public static class StreamMergeContextExtension
{
/// <summary>
/// 本次查询是否涉及到分表
/// </summary>
/// <param name="streamMergeContext"></param>
/// <typeparam name="TEntity"></typeparam>
/// <returns></returns>
public static bool IsShardingQuery<TEntity>(this StreamMergeContext<TEntity> streamMergeContext)
{
return streamMergeContext.RouteResults.Count() > 1;
}
public static bool IsSingleShardingTableQuery<TEntity>(this StreamMergeContext<TEntity> streamMergeContext)
{
return streamMergeContext.RouteResults.First().ReplaceTables.Count(o => o.EntityType.IsShardingTable()) == 1;
}
}
}

View File

@ -14,7 +14,7 @@ namespace ShardingCore.Sharding.Abstractions
public interface IShardingQueryExecutor
{
/// <summary>
/// 同步执行获取结果
/// ͬ<EFBFBD><EFBFBD>ִ<EFBFBD>л<EFBFBD>ȡ<EFBFBD><EFBFBD><EFBFBD>
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="currentContext"></param>
@ -22,7 +22,7 @@ namespace ShardingCore.Sharding.Abstractions
/// <returns></returns>
TResult Execute<TResult>(ICurrentDbContext currentContext, Expression query);
/// <summary>
/// 异步执行获取结果
/// <EFBFBD>첽ִ<EFBFBD>л<EFBFBD>ȡ<EFBFBD><EFBFBD><EFBFBD>
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="currentContext"></param>

View File

@ -1,8 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.ShardingQueryExecutors
{
@ -13,16 +19,31 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class EnumeratorShardingQueryExecutor<TEntity>:IShardingQueryExecutor<TEntity>
public class EnumeratorShardingQueryExecutor<TEntity>
{
public MethodCallExpression GetQueryExpression()
{
throw new NotImplementedException();
}
private readonly StreamMergeContext<TEntity> _streamMergeContext;
private readonly IShardingPageManager _shardingPageManager;
public IShardingDbContext GetCurrentShardingDbContext()
public EnumeratorShardingQueryExecutor(StreamMergeContext<TEntity> streamMergeContext)
{
throw new NotImplementedException();
_streamMergeContext = streamMergeContext;
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
}
public IEnumeratorStreamMergeEngine<TEntity> ExecuteAsync(CancellationToken cancellationToken = new CancellationToken())
{
//操作单表
if (!_streamMergeContext.IsShardingQuery())
{
return new SingleQueryEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext);
}
//未开启系统分表
if (_shardingPageManager.Current == null)
{
return new DefaultShardingEnumeratorAsyncStreamMergeEngine<TEntity>(_streamMergeContext);
}
}
}
}

View File

@ -42,15 +42,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
await enumator.MoveNextAsync();
return enumator;
}
public virtual IQueryable<TEntity> CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
var useOriginal = routeCount > 1;
DbContextQueryStore.TryAdd(routeResult,shardingDbContext);
var newQueryable = (IQueryable<TEntity>)(useOriginal ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable())
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
}
// public virtual IQueryable<TEntity> CreateAsyncExecuteQueryable(RouteResult routeResult)
// {
// var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
// var useOriginal = StreamMergeContext > 1;
// DbContextQueryStore.TryAdd(routeResult,shardingDbContext);
// var newQueryable = (IQueryable<TEntity>)(useOriginal ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable())
// .ReplaceDbContextQueryable(shardingDbContext);
// return newQueryable;
// }
public override IEnumerator<TEntity> GetEnumerator()
{

View File

@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@ -18,26 +18,22 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class NormalEnumeratorAsyncStreamMergeEngine<TEntity>:AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
public class DefaultShardingEnumeratorAsyncStreamMergeEngine<TEntity>:AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
{
private readonly bool _multiRouteQuery;
public NormalEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
public DefaultShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
_multiRouteQuery = streamMergeContext.RouteResults.Count() > 1;
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
{
var tableResult = StreamMergeContext.RouteResults;
var routeCount = tableResult.Count();
var enumeratorTasks = tableResult.Select(routeResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(routeResult);
return Task.Run(async () =>
{
try
{
var newQueryable = CreateAsyncExecuteQueryable(routeResult, routeCount);
var asyncEnumerator = await DoGetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
@ -49,22 +45,22 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
});
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();
return streamEnumerators;
}
public override IQueryable<TEntity> CreateAsyncExecuteQueryable(RouteResult routeResult, int routeCount)
private IQueryable<TEntity> CreateAsyncExecuteQueryable(RouteResult routeResult)
{
var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
DbContextQueryStore.TryAdd(routeResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>)(_multiRouteQuery ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable())
var newQueryable = (IQueryable<TEntity>)StreamMergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
return newQueryable;
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (_multiRouteQuery && StreamMergeContext.HasSkipTake())
if (StreamMergeContext.HasSkipTake())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);

View File

@ -0,0 +1,41 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 02 September 2021 20:58:10
* @Email: 326308290@qq.com
*/
public class SingleQueryEnumeratorAsyncStreamMergeEngine<TEntity> : AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
{
public SingleQueryEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators()
{
var routeResult = StreamMergeContext.RouteResults.First();
var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
DbContextQueryStore.TryAdd(routeResult, shardingDbContext);
var newQueryable = (IQueryable<TEntity>) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext);
var asyncEnumerator = DoGetAsyncEnumerator(newQueryable).WaitAndUnwrapException();
return new[] {new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator)};
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
return new MultiOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);
}
}
}