添加平行表关系保证相同表之间不会出现笛卡尔积[#74]

This commit is contained in:
xuejiaming 2021-12-09 15:46:18 +08:00
parent 7bcadfc100
commit 62627e26cc
47 changed files with 554 additions and 319 deletions

View File

@ -51,27 +51,27 @@ namespace Sample.SqlServer
o.AddShardingTableRoute<SysUserSalaryVirtualTableRoute>();
o.AddShardingTableRoute<TestYearShardingVirtualTableRoute>();
}).End();
services.AddShardingDbContext<DefaultShardingDbContext1>(
(conn, o) =>
o.UseSqlServer(conn).UseLoggerFactory(efLogger)
).Begin(o =>
{
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
o.AutoTrackEntity = true;
o.MaxQueryConnectionsLimit = Environment.ProcessorCount;
o.ConnectionMode = ConnectionModeEnum.SYSTEM_AUTO;
//if SysTest entity not exists in db and db is exists
//o.AddEntityTryCreateTable<SysTest>(); // or `o.AddEntitiesTryCreateTable(typeof(SysTest));`
})
//.AddShardingQuery((conStr, builder) => builder.UseSqlServer(conStr).UseLoggerFactory(efLogger))//无需添加.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking) 并发查询系统会自动添加NoTracking
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection).UseLoggerFactory(efLogger))
.AddDefaultDataSource("A",
"Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;")
.AddShardingTableRoute(o =>
{
}).End();
//services.AddShardingDbContext<DefaultShardingDbContext1>(
// (conn, o) =>
// o.UseSqlServer(conn).UseLoggerFactory(efLogger)
// ).Begin(o =>
// {
// o.CreateShardingTableOnStart = true;
// o.EnsureCreatedWithOutShardingTable = true;
// o.AutoTrackEntity = true;
// o.MaxQueryConnectionsLimit = Environment.ProcessorCount;
// o.ConnectionMode = ConnectionModeEnum.SYSTEM_AUTO;
// //if SysTest entity not exists in db and db is exists
// //o.AddEntityTryCreateTable<SysTest>(); // or `o.AddEntitiesTryCreateTable(typeof(SysTest));`
// })
// //.AddShardingQuery((conStr, builder) => builder.UseSqlServer(conStr).UseLoggerFactory(efLogger))//无需添加.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking) 并发查询系统会自动添加NoTracking
// .AddShardingTransaction((connection, builder) =>
// builder.UseSqlServer(connection).UseLoggerFactory(efLogger))
// .AddDefaultDataSource("A",
// "Data Source=localhost;Initial Catalog=ShardingCoreDBXA;Integrated Security=True;")
// .AddShardingTableRoute(o =>
// {
// }).End();
services.AddHealthChecks().AddDbContextCheck<DefaultShardingDbContext>();
//services.AddShardingDbContext<DefaultShardingDbContext, DefaultTableDbContext>(

View File

@ -24,6 +24,7 @@ using ShardingCore.Extensions;
using ShardingCore.Jobs;
using ShardingCore.Jobs.Abstaractions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.TableCreator;
using ShardingCore.Utils;
@ -58,6 +59,7 @@ namespace ShardingCore.Bootstrapers
private readonly IVirtualDataSource<TShardingDbContext> _virtualDataSource;
private readonly IEntityMetadataManager<TShardingDbContext> _entityMetadataManager;
private readonly IShardingTableCreator<TShardingDbContext> _tableCreator;
private readonly IParallelTableManager<TShardingDbContext> _parallelTableManager;
private readonly ILogger<ShardingDbContextBootstrapper<TShardingDbContext>> _logger;
public ShardingDbContextBootstrapper(IShardingConfigOption shardingConfigOption)
@ -68,6 +70,7 @@ namespace ShardingCore.Bootstrapers
_entityMetadataManager = ShardingContainer.GetService<IEntityMetadataManager<TShardingDbContext>>();
_tableCreator = ShardingContainer.GetService<IShardingTableCreator<TShardingDbContext>>();
_virtualDataSource= ShardingContainer.GetService<IVirtualDataSource<TShardingDbContext>>();
_parallelTableManager = ShardingContainer.GetService<IParallelTableManager<TShardingDbContext>>();
_logger = ShardingContainer.GetService<ILogger<ShardingDbContextBootstrapper<TShardingDbContext>>>();
}
/// <summary>
@ -76,11 +79,21 @@ namespace ShardingCore.Bootstrapers
public void Init()
{
_virtualDataSource.AddPhysicDataSource(new DefaultPhysicDataSource(_shardingConfigOption.DefaultDataSourceName, _shardingConfigOption.DefaultConnectionString, true));
InitializeParallelTables();
InitializeEntityMetadata();
InitializeConfigure();
_virtualDataSource.CheckVirtualDataSource();
}
private void InitializeParallelTables()
{
foreach (var parallelTableGroupNode in _shardingConfigOption.GetParallelTableGroupNodes())
{
_parallelTableManager.AddParallelTable(parallelTableGroupNode);
}
}
private void InitializeEntityMetadata()
{
using (var serviceScope = ShardingContainer.ServiceProvider.CreateScope())

View File

@ -29,6 +29,7 @@ using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingQueryExecutors;
using ShardingCore.TableCreator;
using System;
using ShardingCore.Sharding.ParallelTables;
namespace ShardingCore
{
@ -95,6 +96,7 @@ namespace ShardingCore
services.TryAddSingleton(typeof(ITableRouteRuleEngine<>),typeof(TableRouteRuleEngine<>));
//分表引擎工程
services.TryAddSingleton(typeof(ITableRouteRuleEngineFactory<>),typeof(TableRouteRuleEngineFactory<>));
services.TryAddSingleton(typeof(IParallelTableManager<>),typeof(ParallelTableManager<>));
services.TryAddSingleton<IRouteTailFactory, RouteTailFactory>();
services.TryAddSingleton<IShardingQueryExecutor, DefaultShardingQueryExecutor>();

View File

@ -1,9 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ParallelTables;
namespace ShardingCore.DIExtensions
{
@ -49,6 +52,10 @@ namespace ShardingCore.DIExtensions
{
ShardingConfigOption.AddEntityTryCreateTable(entityType);
}
foreach (var parallelTableGroupNode in shardingCoreBeginOptions.GetParallelTableGroupNodes())
{
ShardingConfigOption.AddParallelTableGroupNode(parallelTableGroupNode);
}
return new ShardingTransactionBuilder<TShardingDbContext>(this);
//return new ShardingQueryBuilder<TShardingDbContext>(this);
@ -114,5 +121,19 @@ namespace ShardingCore.DIExtensions
{
return _createTableEntities;
}
public readonly ISet<ParallelTableGroupNode> _parallelTables = new HashSet<ParallelTableGroupNode>();
public bool AddParallelTables(params Type[] types)
{
if (types.Length <= 0)
throw new ShardingCoreInvalidOperationException(
$"{nameof(AddParallelTables)} args :[{string.Join(",", types.Select(o => o.Name))}] should more than one length");
return _parallelTables.Add(new ParallelTableGroupNode(types.Select(o => new ParallelTableComparerType(o))));
}
public ISet<ParallelTableGroupNode> GetParallelTableGroupNodes()
{
return _parallelTables;
}
}
}

View File

@ -75,6 +75,21 @@ namespace ShardingCore.Extensions
return task.GetAwaiter().GetResult();
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="task"></param>
/// <param name="continueOnCapturedContext"></param>
/// <returns>The result of the task.</returns>
/// <exception cref="ArgumentNullException"></exception>
public static TResult WaitAndUnwrapException<TResult>(this Task<TResult> task,bool continueOnCapturedContext)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
return task.ConfigureAwait(continueOnCapturedContext).GetAwaiter().GetResult();
}
/// <summary>
/// Waits for the task to complete, unwrapping any exceptions.
/// </summary>

View File

@ -1,42 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Helpers
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 17 December 2020 16:10:54
* @Email: 326308290@qq.com
*/
/// <summary>
/// 异步转同步,防止ASP.NET中死锁
/// https://cpratt.co/async-tips-tricks/
/// </summary>
public static class AsyncHelper
{
private static readonly TaskFactory _myTaskFactory =
new TaskFactory(CancellationToken.None, TaskCreationOptions.None, TaskContinuationOptions.None, TaskScheduler.Default);
/// <summary>
/// 同步执行
/// </summary>
/// <param name="func">任务</param>
public static void RunSync(Func<Task> func)
{
_myTaskFactory.StartNew(func).Unwrap().GetAwaiter().GetResult();
}
/// <summary>
/// 同步执行
/// </summary>
/// <typeparam name="TResult">返回类型</typeparam>
/// <param name="func">任务</param>
/// <returns></returns>
public static TResult RunSync<TResult>(Func<Task<TResult>> func)
{
return _myTaskFactory.StartNew(func).Unwrap().GetAwaiter().GetResult();
}
}
}

View File

@ -5,6 +5,7 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ParallelTables;
namespace ShardingCore
{
@ -78,6 +79,10 @@ namespace ShardingCore
/// 连接数限制
/// </summary>
public ConnectionModeEnum ConnectionMode { get; set; }
public bool AddParallelTableGroupNode(ParallelTableGroupNode parallelTableGroupNode);
public ISet<ParallelTableGroupNode> GetParallelTableGroupNodes();
///// <summary>
///// 是否启用表路由编译缓存
///// </summary>

View File

@ -180,11 +180,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
tasks = sqlExecutorUnits.Skip(1).Select(sqlExecutorUnit =>
{
return Task.Run(async () =>
{
return await sqlExecutorUnitExecuteAsync(sqlExecutorUnit);
}, cancellationToken);
return Task.Run(async () => await sqlExecutorUnitExecuteAsync(sqlExecutorUnit), cancellationToken);
}).ToArray();
}
else

View File

@ -1,30 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/10/4 6:25:02
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal interface IAsyncParallelLimit:IDisposable
{
/// <summary>
/// 并发执行方法
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="executeAsync"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,
CancellationToken cancellationToken = new CancellationToken());
bool AsyncParallelContinue<TResult>(List<TResult> results);
}
}

View File

@ -1,6 +1,7 @@
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines
@ -12,13 +13,16 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.Abstract
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal abstract class AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine<TEntity>,IEnsureMergeResult<TResult>
internal abstract class AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncMergeEngine<TEntity>, IEnsureMergeResult<TResult>
{
protected AbstractEnsureMethodCallInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public abstract TResult MergeResult();
public virtual TResult MergeResult()
{
return MergeResultAsync().WaitAndUnwrapException(false);
}
public abstract Task<TResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken());
}

View File

@ -1,6 +1,7 @@
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines
@ -20,7 +21,10 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.Abstract
{
}
public abstract TResult MergeResult<TResult>();
public virtual TResult MergeResult<TResult>()
{
return MergeResultAsync<TResult>().WaitAndUnwrapException(false);
}
public abstract Task<TResult> MergeResultAsync<TResult>(
CancellationToken cancellationToken = new CancellationToken());

View File

@ -19,11 +19,6 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.Abstract
protected AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public override TResult MergeResult<TResult>()
{
var current = DoMergeResult<TResult>();
return ProcessTrackResult(current);
}
private TResult ProcessTrackResult<TResult>(TResult current)
{
@ -51,7 +46,6 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.Abstract
var current = await DoMergeResultAsync<TResult>(cancellationToken);
return ProcessTrackResult(current);
}
public abstract TResult DoMergeResult<TResult>();
public abstract Task<TResult> DoMergeResultAsync<TResult>(
CancellationToken cancellationToken = new CancellationToken());

View File

@ -1,7 +1,6 @@
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators.AggregateExtensions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
@ -30,11 +29,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
}
public override TEnsureResult MergeResult()
{
return AsyncHelper.RunSync(() => MergeResultAsync());
}
private async Task<List<RouteQueryResult<AverageResult<T>>>> AggregateAverageResultAsync<T>(CancellationToken cancellationToken = new CancellationToken())
{
return (await base.ExecuteAsync(

View File

@ -1,16 +1,14 @@
using System;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
@ -26,12 +24,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
public MaxAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public override TResult MergeResult<TResult>()
{
return MergeResultAsync<TResult>().WaitAndUnwrapException();
}
private TResult GetMaxTResult<TInnerSelect, TResult>(List<RouteQueryResult<TInnerSelect>> source)
{
var routeQueryResults = source.Where(o => o.QueryResult != null).ToList();

View File

@ -1,16 +1,14 @@
using System;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
@ -27,13 +25,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
}
public override TResult MergeResult<TResult>()
{
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
private TResult GetMinTResult<TInnerSelect, TResult>(List<RouteQueryResult<TInnerSelect>> source)
{
var routeQueryResults = source.Where(o => o.QueryResult != null).ToList();

View File

@ -1,19 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Query;
using ShardingCore.Exceptions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators.AggregateExtensions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
@ -30,12 +25,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{
}
public override TEnsureResult MergeResult()
{
return AsyncHelper.RunSync(() => MergeResultAsync());
}
public override async Task<TEnsureResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var resultType = typeof(TEnsureResult);

View File

@ -1,16 +1,11 @@
using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -27,11 +22,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override bool MergeResult()
{
return AsyncHelper.RunSync(()=> MergeResultAsync());
}
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).AllAsync(_predicate, cancellationToken), cancellationToken);

View File

@ -1,16 +1,10 @@
using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -27,11 +21,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override bool MergeResult()
{
return AsyncHelper.RunSync(() => MergeResultAsync());
}
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TEntity>)queryable).AnyAsync(cancellationToken), cancellationToken);

View File

@ -1,12 +1,10 @@
using System;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -23,10 +21,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override bool MergeResult()
{
return AsyncHelper.RunSync(() => MergeResultAsync());
}
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{

View File

@ -1,12 +1,11 @@
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -25,10 +24,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
}
public override int MergeResult()
{
return AsyncHelper.RunSync(() => MergeResultAsync());
}
public override async Task<int> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{

View File

@ -0,0 +1,54 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
internal class EmptyQueryEnumerator<T> : IAsyncEnumerator<T>,IEnumerator<T>
{
public ValueTask DisposeAsync()
{
return new ValueTask();
}
#if !EFCORE2
public ValueTask<bool> MoveNextAsync()
{
return new ValueTask<bool>(false);
}
#endif
#if EFCORE2
public Task<bool> MoveNext(CancellationToken cancellationToken)
{
return Task.FromResult(false);
}
#endif
public bool MoveNext()
{
return false;
}
public void Reset()
{
}
T IEnumerator<T>.Current => default;
object IEnumerator.Current => default;
T IAsyncEnumerator<T>.Current => default;
public void Dispose()
{
}
}
}

View File

@ -0,0 +1,48 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync;
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
internal class EmptyQueryEnumeratorAsyncStreamMergeEngine<TShardingDbContext, TEntity> : AbstractEnumeratorStreamMergeEngine<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
{
public EmptyQueryEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var asyncEnumerator = new EmptyQueryEnumerator<TEntity>();
if (async)
{
return new[] { new StreamMergeAsyncEnumerator<TEntity>((IAsyncEnumerator<TEntity>)asyncEnumerator) };
}
else
{
return new[] { new StreamMergeAsyncEnumerator<TEntity>((IEnumerator<TEntity>)asyncEnumerator) };
}
}
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (streamsAsyncEnumerators.Length != 1)
throw new ShardingCoreException($"{nameof(EmptyQueryEnumeratorAsyncStreamMergeEngine<TShardingDbContext, TEntity>)} has more {nameof(IStreamMergeAsyncEnumerator<TEntity>)}");
return streamsAsyncEnumerators[0];
}
}
}

View File

@ -52,6 +52,11 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
public IEnumeratorStreamMergeEngine<TEntity> GetMergeEngine()
{
if (_streamMergeContext.DataSourceRouteResult.IntersectDataSources.IsEmpty() ||
_streamMergeContext.TableRouteResults.IsEmpty())
{
return new EmptyQueryEnumeratorAsyncStreamMergeEngine<TShardingDbContext, TEntity>(_streamMergeContext);
}
//本次查询没有跨库没有跨表就可以直接执行
if (!_streamMergeContext.IsCrossDataSource&&!_streamMergeContext.IsCrossTable)
{

View File

@ -1,15 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -26,11 +23,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override TResult DoMergeResult<TResult>()
{
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken);

View File

@ -1,20 +1,11 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
using ShardingCore.Utils;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -30,12 +21,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public FirstOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public override TResult DoMergeResult<TResult>()
{
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{

View File

@ -1,15 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -25,12 +22,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public LastAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public override TResult DoMergeResult<TResult>()
{
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{

View File

@ -1,15 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -26,11 +22,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override TResult DoMergeResult<TResult>()
{
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).LastOrDefaultAsync(cancellationToken), cancellationToken);

View File

@ -1,18 +1,11 @@
using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Exceptions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -31,11 +24,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
_shardingPageManager= ShardingContainer.GetService<IShardingPageManager>();
}
public override long MergeResult()
{
return AsyncHelper.RunSync(() => MergeResultAsync());
}
public override async Task<long> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{

View File

@ -1,7 +1,4 @@
using System;
using System.Collections.Generic;
using System.Text;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -1,15 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -26,11 +22,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override TResult DoMergeResult<TResult>()
{
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).SingleOrDefaultAsync(cancellationToken), cancellationToken);

View File

@ -1,15 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.StreamMergeEngines
{
@ -26,12 +23,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{
}
public override TResult DoMergeResult<TResult>()
{
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
}
public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).SingleOrDefaultAsync(cancellationToken), cancellationToken);

View File

@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ParallelTables
{
internal interface IParallelTableManager
{
/// <summary>
/// 添加平行表
/// </summary>
/// <param name="parallelTableGroupNode"></param>
/// <returns></returns>
bool AddParallelTable(ParallelTableGroupNode parallelTableGroupNode);
/// <summary>
/// 是否是平行表查询
/// </summary>
/// <param name="entityTypes"></param>
/// <returns></returns>
bool IsParallelTableQuery(IEnumerable<Type> entityTypes);
/// <summary>
/// 是否是平行表查询
/// </summary>
/// <param name="parallelTableGroupNode"></param>
/// <returns></returns>
bool IsParallelTableQuery(ParallelTableGroupNode parallelTableGroupNode);
}
}

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ParallelTables
{
internal interface IParallelTableManager<TShardingDbContext> : IParallelTableManager
where TShardingDbContext : DbContext, IShardingDbContext
{
}
}

View File

@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.ExtensionExpressionComparer.Internals;
namespace ShardingCore.Sharding.ParallelTables
{
/// <summary>
/// 单张表对象类型比较器
/// </summary>
public class ParallelTableComparerType : IComparable<ParallelTableComparerType>, IComparable
{
public Type Type { get; }
public ParallelTableComparerType(Type type)
{
Type = type;
}
#region equals hashcode
protected bool Equals(ParallelTableComparerType other)
{
return Type == other.Type;
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != this.GetType()) return false;
return Equals((ParallelTableComparerType)obj);
}
public override int GetHashCode()
{
return (Type != null ? Type.GetHashCode() : 0);
}
#endregion
public int CompareTo(ParallelTableComparerType? other)
{
if (Type == null)
return -1;
if (other == null)
return 1;
if (other.Type == null)
return 1;
return this.GetHashCode()-other.GetHashCode();
}
public int CompareTo(object? obj)
{
return CompareTo((Type)obj);
}
}
}

View File

@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.ParallelTables
{
/// <summary>
/// 平行表组节点用来表示一组平行表
/// </summary>
public class ParallelTableGroupNode
{
private readonly ISet<ParallelTableComparerType> _entities;
public ParallelTableGroupNode(IEnumerable<ParallelTableComparerType> entities)
{
_entities = new SortedSet<ParallelTableComparerType>(entities);
}
protected bool Equals(ParallelTableGroupNode other)
{
return _entities.SequenceEqual(other._entities);
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != this.GetType()) return false;
return Equals((ParallelTableGroupNode)obj);
}
public override int GetHashCode()
{
return (_entities != null ? _entities.Sum(o=>o.GetHashCode()) : 0);
}
}
}

View File

@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ParallelTables
{
internal sealed class ParallelTableManager<TShardingDbContext> : IParallelTableManager<TShardingDbContext>
where TShardingDbContext : DbContext, IShardingDbContext
{
private readonly ISet<ParallelTableGroupNode> _parallelTableConfigs = new HashSet<ParallelTableGroupNode>();
public bool AddParallelTable(ParallelTableGroupNode parallelTableGroupNode)
{
return _parallelTableConfigs.Add(parallelTableGroupNode);
}
public bool IsParallelTableQuery(IEnumerable<Type> entityTypes)
{
if (entityTypes.IsEmpty())
return false;
var parallelTableGroupNode = new ParallelTableGroupNode(entityTypes.Select(o=>new ParallelTableComparerType(o)));
return IsParallelTableQuery(parallelTableGroupNode);
}
public bool IsParallelTableQuery(ParallelTableGroupNode parallelTableGroupNode)
{
if (parallelTableGroupNode == null)
return false;
return _parallelTableConfigs.Contains(parallelTableGroupNode);
}
}
}

View File

@ -11,11 +11,14 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using ShardingCore.Core;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
@ -71,20 +74,32 @@ namespace ShardingCore.Sharding
private readonly ConcurrentDictionary<DbContext, object> _parallelDbContexts;
private readonly IShardingComparer _shardingComparer;
private readonly IParallelTableManager _parallelTableManager;
private readonly IEntityMetadataManager _entityMetadataManager;
public StreamMergeContext(IQueryable<TEntity> source, IShardingDbContext shardingDbContext,
DataSourceRouteResult dataSourceRouteResult,
IEnumerable<TableRouteResult> tableRouteResults,
IRouteTailFactory routeTailFactory)
{
QueryEntities = source.ParseQueryableEntities();
//_shardingScopeFactory = shardingScopeFactory;
_source = source;
_shardingDbContext = shardingDbContext;
_routeTailFactory = routeTailFactory;
DataSourceRouteResult = dataSourceRouteResult;
TableRouteResults = tableRouteResults;
_parallelTableManager = (IParallelTableManager)ShardingContainer.GetService(typeof(IParallelTableManager<>).GetGenericType0(shardingDbContext.GetType()));
_entityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContext.GetType()));
if (_parallelTableManager.IsParallelTableQuery(QueryEntities.Where(o=> _entityMetadataManager.IsShardingTable(o))))
{
TableRouteResults = tableRouteResults.Where(o=> o.ReplaceTables.Select(p => p.Tail).ToHashSet().Count==1);
}
else
{
TableRouteResults = tableRouteResults;
}
IsCrossDataSource = dataSourceRouteResult.IntersectDataSources.Count > 1;
IsCrossTable = tableRouteResults.Count() > 1;
IsCrossTable = TableRouteResults.Count() > 1;
var reWriteResult = new ReWriteEngine<TEntity>(source).ReWrite();
Skip = reWriteResult.Skip;
Take = reWriteResult.Take;
@ -93,7 +108,6 @@ namespace ShardingCore.Sharding
SelectContext = reWriteResult.SelectContext;
GroupByContext = reWriteResult.GroupByContext;
_reWriteSource = reWriteResult.ReWriteQueryable;
QueryEntities = source.ParseQueryableEntities();
_trackerManager =
(ITrackerManager)ShardingContainer.GetService(
typeof(ITrackerManager<>).GetGenericType0(shardingDbContext.GetType()));

View File

@ -11,6 +11,7 @@ using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ShardingComparision;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
@ -29,6 +30,7 @@ namespace ShardingCore
private readonly Dictionary<Type, Type> _virtualDataSourceRoutes = new Dictionary<Type, Type>();
private readonly Dictionary<Type, Type> _virtualTableRoutes = new Dictionary<Type, Type>();
private readonly ISet<Type> _createTableEntities = new HashSet<Type>();
public readonly ISet<ParallelTableGroupNode> _parallelTables = new HashSet<ParallelTableGroupNode>();
public Action<DbConnection, DbContextOptionsBuilder> SameConnectionConfigure { get; private set; }
public Action<string, DbContextOptionsBuilder> DefaultQueryConfigure { get; private set; }
@ -234,6 +236,17 @@ namespace ShardingCore
public string DefaultConnectionString { get; set; }
public int MaxQueryConnectionsLimit { get; set; } = Environment.ProcessorCount;
public ConnectionModeEnum ConnectionMode { get; set; } = ConnectionModeEnum.SYSTEM_AUTO;
public bool AddParallelTableGroupNode(ParallelTableGroupNode parallelTableGroupNode)
{
Check.NotNull(parallelTableGroupNode, $"{nameof(parallelTableGroupNode)}");
return _parallelTables.Add(parallelTableGroupNode);
}
public ISet<ParallelTableGroupNode> GetParallelTableGroupNodes()
{
return _parallelTables;
}
//public bool? EnableTableRouteCompileCache { get; set; }
//public bool? EnableDataSourceRouteCompileCache { get; set; }
}

View File

@ -20,6 +20,7 @@ using ShardingCore.Extensions;
using ShardingCore.Extensions.ShardingPageExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
using ShardingCore.Sharding.ShardingDbContextExecutors;
@ -214,6 +215,13 @@ namespace ShardingCore.Test
}
Assert.Equal($"{emptyTailIdentity},{aTailIdentity},{bTailIdentity}", string.Join(",", dics.Keys));
}
var x1x1 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserMod)), new ParallelTableComparerType(typeof(SysUserSalary)) });
var x2x2 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), });
Assert.Equal(x1x1, x2x2);
Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode());
}
public class SequenceClass
@ -347,6 +355,17 @@ namespace ShardingCore.Test
[Fact]
public async Task ToList_Join_Test()
{
var list111 = await (from u in _virtualDbContext.Set<SysUserMod>()
join salary in _virtualDbContext.Set<SysUserSalary>()
on u.Id equals salary.UserId
select new
{
u.Id,
u.Age,
Salary = salary.Salary,
DateOfMonth = salary.DateOfMonth,
Name = u.Name
}).CountAsync();
var list = await (from u in _virtualDbContext.Set<SysUserMod>()
join salary in _virtualDbContext.Set<SysUserSalary>()
on u.Id equals salary.UserId

View File

@ -21,6 +21,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using ShardingCore.Sharding.ParallelTables;
using Xunit;
namespace ShardingCore.Test
@ -175,6 +176,13 @@ namespace ShardingCore.Test
var virtualTable = _virtualTableManager.GetVirtualTable<SysUserMod>();
Assert.NotNull(virtualTable);
var x1x1 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserMod)), new ParallelTableComparerType(typeof(SysUserSalary)) });
var x2x2 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), });
Assert.Equal(x1x1, x2x2);
Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode());
}
public class SequenceClass

View File

@ -43,6 +43,7 @@ namespace ShardingCore.Test
#endif //o.MaxQueryConnectionsLimit = 1;
o.AutoTrackEntity = true;
//o.AddParallelTables(typeof(SysUserMod), typeof(SysUserSalary));
})
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection).UseLoggerFactory(efLogger))

View File

@ -18,6 +18,7 @@ using ShardingCore.Extensions;
using ShardingCore.Extensions.ShardingPageExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
using ShardingCore.Sharding.ShardingDbContextExecutors;
@ -203,6 +204,13 @@ namespace ShardingCore.Test2x
}
Assert.Equal($"{emptyTailIdentity},{aTailIdentity},{bTailIdentity}", string.Join(",", dics.Keys));
}
var x1x1 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserMod)), new ParallelTableComparerType(typeof(SysUserSalary)) });
var x2x2 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), });
Assert.Equal(x1x1, x2x2);
Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode());
}
public class SequenceClass

View File

@ -15,6 +15,7 @@ using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.ShardingPageExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
using ShardingCore.TableCreator;
@ -173,6 +174,13 @@ namespace ShardingCore.Test2x
var virtualTable = _virtualTableManager.GetVirtualTable<SysUserMod>();
Assert.NotNull(virtualTable);
var x1x1 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserMod)), new ParallelTableComparerType(typeof(SysUserSalary)) });
var x2x2 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), });
Assert.Equal(x1x1, x2x2);
Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode());
}
public class SequenceClass

View File

@ -18,6 +18,7 @@ using ShardingCore.Extensions;
using ShardingCore.Extensions.ShardingPageExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
using ShardingCore.Sharding.ShardingDbContextExecutors;
@ -202,6 +203,13 @@ namespace ShardingCore.Test3x
}
Assert.Equal($"{emptyTailIdentity},{aTailIdentity},{bTailIdentity}", string.Join(",", dics.Keys));
}
var x1x1 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserMod)), new ParallelTableComparerType(typeof(SysUserSalary)) });
var x2x2 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), });
Assert.Equal(x1x1, x2x2);
Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode());
}
public class SequenceClass

View File

@ -16,6 +16,7 @@ using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.ShardingPageExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
using ShardingCore.TableCreator;
@ -175,6 +176,13 @@ namespace ShardingCore.Test3x
var virtualTable = _virtualTableManager.GetVirtualTable<SysUserMod>();
Assert.NotNull(virtualTable);
var x1x1 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserMod)), new ParallelTableComparerType(typeof(SysUserSalary)) });
var x2x2 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), });
Assert.Equal(x1x1, x2x2);
Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode());
}
public class SequenceClass

View File

@ -18,6 +18,7 @@ using ShardingCore.Extensions;
using ShardingCore.Extensions.ShardingPageExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
using ShardingCore.Sharding.ShardingDbContextExecutors;
@ -202,6 +203,13 @@ namespace ShardingCore.Test5x
}
Assert.Equal($"{emptyTailIdentity},{aTailIdentity},{bTailIdentity}", string.Join(",", dics.Keys));
}
var x1x1 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserMod)), new ParallelTableComparerType(typeof(SysUserSalary)) });
var x2x2 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), });
Assert.Equal(x1x1, x2x2);
Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode());
}
public class SequenceClass

View File

@ -16,6 +16,7 @@ using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.ShardingPageExtensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using ShardingCore.Sharding.ShardingComparision.Abstractions;
using ShardingCore.TableCreator;
@ -175,6 +176,13 @@ namespace ShardingCore.Test5x
var virtualTable = _virtualTableManager.GetVirtualTable<SysUserMod>();
Assert.NotNull(virtualTable);
var x1x1 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserMod)), new ParallelTableComparerType(typeof(SysUserSalary)) });
var x2x2 = new ParallelTableGroupNode(new HashSet<ParallelTableComparerType>()
{ new ParallelTableComparerType(typeof(SysUserSalary)),new ParallelTableComparerType(typeof(SysUserMod)), });
Assert.Equal(x1x1, x2x2);
Assert.Equal(x1x1.GetHashCode(), x2x2.GetHashCode());
}
public class SequenceClass