完成对分表查询的核心线程数控制,并且支持查询并发带timeout超时时间
This commit is contained in:
parent
74c6f7f474
commit
6dedd98c9d
11
README.md
11
README.md
|
@ -2,6 +2,14 @@
|
|||
|
||||
|
||||
`ShardingCore` 易用、简单、高性能、普适性,是一款扩展针对efcore生态下的分表分库的扩展解决方案,支持efcore2+的所有版本,支持efcore2+的所有数据库、支持自定义路由、动态路由、高性能分页、读写分离的一款组件,如果你喜欢这组件或者这个组件对你有帮助请[点我star github 地址](https://github.com/xuejmnet/sharding-core)
|
||||
|
||||
---
|
||||
|
||||
<div align="center">
|
||||
<p> <a href="https://github.com/xuejmnet/sharding-core">Github Star</a> 助力dotnet 生态 <a href="https://gitee.com/dotnetchina/sharding-core">Gitee Star</a> </p>
|
||||
</div>
|
||||
|
||||
---
|
||||
### 依赖
|
||||
|
||||
Release | EF Core | .NET Standard | .NET (Core)
|
||||
|
@ -522,7 +530,8 @@ or
|
|||
|
||||
## 默认路由
|
||||
分库提供了默认的路由分表则需要自己去实现,具体实现可以参考分库
|
||||
抽象abstract | 路由规则 | tail | 索引
|
||||
|
||||
抽象abstract | 路由规则 | tail | 索引
|
||||
--- |--- |--- |---
|
||||
AbstractSimpleShardingModKeyIntVirtualTableRoute |取模 |0,1,2... | `=,contains`
|
||||
AbstractSimpleShardingModKeyStringVirtualTableRoute |取模 |0,1,2... | `=,contains`
|
||||
|
|
|
@ -37,6 +37,8 @@ namespace ShardingCore.DIExtensions
|
|||
shardingCoreBeginOptionsConfigure?.Invoke(shardingCoreBeginOptions);
|
||||
ShardingConfigOption.EnsureCreatedWithOutShardingTable = shardingCoreBeginOptions.EnsureCreatedWithOutShardingTable;
|
||||
ShardingConfigOption.AutoTrackEntity = shardingCoreBeginOptions.AutoTrackEntity;
|
||||
ShardingConfigOption.ParallelQueryMaxThreadCount = shardingCoreBeginOptions.ParallelQueryMaxThreadCount;
|
||||
ShardingConfigOption.ParallelQueryTimeOut = shardingCoreBeginOptions.ParallelQueryTimeOut;
|
||||
ShardingConfigOption.CreateShardingTableOnStart = shardingCoreBeginOptions.CreateShardingTableOnStart;
|
||||
ShardingConfigOption.IgnoreCreateTableError = shardingCoreBeginOptions.IgnoreCreateTableError;
|
||||
return new ShardingQueryBuilder<TShardingDbContext, TActualDbContext>(this);
|
||||
|
@ -75,6 +77,15 @@ namespace ShardingCore.DIExtensions
|
|||
/// </summary>
|
||||
public bool AutoTrackEntity { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 单次查询并发线程数目(-1表示不限制)
|
||||
/// </summary>
|
||||
public int ParallelQueryMaxThreadCount { get; set; } = 65536;
|
||||
/// <summary>
|
||||
/// 默认30秒超时
|
||||
/// </summary>
|
||||
public TimeSpan ParallelQueryTimeOut { get; set; } = TimeSpan.FromSeconds(30);
|
||||
|
||||
/// <summary>
|
||||
/// 忽略建表时的错误
|
||||
/// </summary>
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Runtime.Serialization;
|
||||
using System.Text;
|
||||
|
||||
namespace ShardingCore.Exceptions
|
||||
{
|
||||
/*
|
||||
* @Author: xjm
|
||||
* @Description:
|
||||
* @Date: 2021/10/3 22:25:59
|
||||
* @Ver: 1.0
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
public class ShardingCoreParallelQueryTimeOutException:ShardingCoreException
|
||||
{
|
||||
public ShardingCoreParallelQueryTimeOutException()
|
||||
{
|
||||
}
|
||||
|
||||
protected ShardingCoreParallelQueryTimeOutException(SerializationInfo info, StreamingContext context) : base(info, context)
|
||||
{
|
||||
}
|
||||
|
||||
public ShardingCoreParallelQueryTimeOutException(string message) : base(message)
|
||||
{
|
||||
}
|
||||
|
||||
public ShardingCoreParallelQueryTimeOutException(string message, Exception innerException) : base(message, innerException)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
|
@ -43,6 +43,15 @@ namespace ShardingCore
|
|||
/// </summary>
|
||||
public bool AutoTrackEntity { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 单次查询并发线程数目(1-65536)
|
||||
/// </summary>
|
||||
public int ParallelQueryMaxThreadCount { get; set; }
|
||||
/// <summary>
|
||||
/// 并发查询超时时间
|
||||
/// </summary>
|
||||
public TimeSpan ParallelQueryTimeOut { get; set; }
|
||||
|
||||
public string DefaultDataSourceName { get; set; }
|
||||
public string DefaultConnectionString { get; set; }
|
||||
}
|
||||
|
|
|
@ -80,11 +80,6 @@ namespace ShardingCore.Sharding
|
|||
return _shardingDbContextExecutor.IsUseReadWriteSeparation();
|
||||
}
|
||||
|
||||
public bool EnableAutoTrack()
|
||||
{
|
||||
return _shardingDbContextExecutor.EnableAutoTrack();
|
||||
}
|
||||
|
||||
|
||||
public override EntityEntry Add(object entity)
|
||||
{
|
||||
|
|
|
@ -37,8 +37,6 @@ namespace ShardingCore.Sharding.Abstractions
|
|||
/// <returns></returns>
|
||||
bool IsUseReadWriteSeparation();
|
||||
|
||||
bool EnableAutoTrack();
|
||||
|
||||
}
|
||||
|
||||
public interface IShardingDbContext<T> : IShardingDbContext where T : DbContext
|
||||
|
|
|
@ -44,7 +44,6 @@ namespace ShardingCore.Sharding.Abstractions
|
|||
/// <returns></returns>
|
||||
bool IsUseReadWriteSeparation();
|
||||
|
||||
bool EnableAutoTrack();
|
||||
/// <summary>
|
||||
/// create sharding db context options
|
||||
/// </summary>
|
||||
|
|
|
@ -1,6 +1,17 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
|
||||
using ShardingCore.Exceptions;
|
||||
using ShardingCore.Extensions;
|
||||
using ShardingCore.Sharding.Abstractions;
|
||||
using ShardingCore.Sharding.Enumerators;
|
||||
using ShardingCore.Sharding.MergeEngines.ParallelControl;
|
||||
using ShardingCore.Sharding.StreamMergeEngines;
|
||||
|
||||
namespace ShardingCore.Sharding.MergeEngines.Abstractions
|
||||
{
|
||||
|
@ -13,5 +24,110 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
|
|||
*/
|
||||
public abstract class AbstractBaseMergeEngine<TEntity>
|
||||
{
|
||||
|
||||
private readonly SemaphoreSlim _semaphore;
|
||||
private readonly Expression _executeExpression;
|
||||
private readonly TimeSpan _parallelQueryTimeOut;
|
||||
|
||||
public AbstractBaseMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext)
|
||||
{
|
||||
_executeExpression = methodCallExpression;
|
||||
var shardingConfigOption = ShardingContainer.GetServices<IShardingConfigOption>()
|
||||
.FirstOrDefault(o => o.ShardingDbContextType == shardingDbContext.GetType())??throw new ArgumentNullException(nameof(IShardingConfigOption));
|
||||
_semaphore = new SemaphoreSlim(Math.Max(1, shardingConfigOption.ParallelQueryMaxThreadCount));
|
||||
_parallelQueryTimeOut = shardingConfigOption.ParallelQueryTimeOut;
|
||||
}
|
||||
public AbstractBaseMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
|
||||
{
|
||||
_executeExpression = streamMergeContext.GetOriginalQueryable().Expression;
|
||||
_semaphore = new SemaphoreSlim(Math.Max(1, streamMergeContext.GetParallelQueryMaxThreadCount()));
|
||||
_parallelQueryTimeOut = streamMergeContext.GetParallelQueryTimeOut();
|
||||
}
|
||||
/// <summary>
|
||||
/// 执行异步并发
|
||||
/// </summary>
|
||||
/// <param name="queryable"></param>
|
||||
/// <param name="dataSourceName"></param>
|
||||
/// <param name="routeResult"></param>
|
||||
/// <param name="efQuery"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns></returns>
|
||||
public Task<RouteQueryResult<TResult>> AsyncParallelResultExecuteAsync<TResult>(IQueryable queryable, string dataSourceName, TableRouteResult routeResult, Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return AsyncParallelControlExecuteAsync(async () =>
|
||||
{
|
||||
var queryResult =
|
||||
await AsyncParallelResultExecuteAsync0<TResult>(queryable, efQuery, cancellationToken);
|
||||
|
||||
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
|
||||
}, cancellationToken);
|
||||
}
|
||||
/// <summary>
|
||||
/// 异步执行并发的实际方法
|
||||
/// </summary>
|
||||
/// <param name="queryable"></param>
|
||||
/// <param name="efQuery"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns></returns>
|
||||
public abstract Task<TResult> AsyncParallelResultExecuteAsync0<TResult>(IQueryable queryable, Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken());
|
||||
/// <summary>
|
||||
/// 执行异步并发
|
||||
/// </summary>
|
||||
/// <param name="queryable"></param>
|
||||
/// <param name="async"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns></returns>
|
||||
public Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumeratorExecuteAsync(IQueryable<TEntity> queryable, bool async, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return AsyncParallelControlExecuteAsync(async () => await AsyncParallelEnumeratorExecuteAsync0(queryable, async, cancellationToken), cancellationToken);
|
||||
}
|
||||
/// <summary>
|
||||
/// 异步多线程控制并发
|
||||
/// </summary>
|
||||
/// <typeparam name="TResult"></typeparam>
|
||||
/// <param name="executeAsync"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns></returns>
|
||||
public Task<TResult> AsyncParallelControlExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,CancellationToken cancellationToken=new CancellationToken())
|
||||
{
|
||||
var parallelTimeOut = _parallelQueryTimeOut.TotalMilliseconds;
|
||||
var acquired = this._semaphore.Wait((int)parallelTimeOut);
|
||||
if (acquired)
|
||||
{
|
||||
var once = new SemaphoreReleaseOnlyOnce(this._semaphore);
|
||||
try
|
||||
{
|
||||
return Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
return await executeAsync();
|
||||
}
|
||||
finally
|
||||
{
|
||||
once.Release();
|
||||
}
|
||||
}, cancellationToken);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
once.Release();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new ShardingCoreParallelQueryTimeOutException(_executeExpression.ShardingPrint());
|
||||
}
|
||||
}
|
||||
/// <summary>
|
||||
/// 异步执行并发的实际方法
|
||||
/// </summary>
|
||||
/// <param name="queryable"></param>
|
||||
/// <param name="async"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns></returns>
|
||||
public abstract Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumeratorExecuteAsync0(IQueryable<TEntity> queryable, bool async,
|
||||
CancellationToken cancellationToken = new CancellationToken());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ using System.Threading.Tasks;
|
|||
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
|
||||
using ShardingCore.Extensions;
|
||||
using ShardingCore.Sharding.Abstractions;
|
||||
using ShardingCore.Sharding.Enumerators;
|
||||
using ShardingCore.Sharding.StreamMergeEngines;
|
||||
|
||||
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
|
||||
|
@ -18,14 +19,14 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
|
|||
* @Ver: 1.0
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
public abstract class AbstractInMemoryAsyncMergeEngine<TEntity> : IInMemoryAsyncMergeEngine<TEntity>
|
||||
public abstract class AbstractInMemoryAsyncMergeEngine<TEntity> : AbstractBaseMergeEngine<TEntity>,IInMemoryAsyncMergeEngine<TEntity>
|
||||
{
|
||||
private readonly MethodCallExpression _methodCallExpression;
|
||||
private readonly StreamMergeContext<TEntity> _mergeContext;
|
||||
private readonly IQueryable<TEntity> _queryable;
|
||||
private readonly Expression _secondExpression;
|
||||
|
||||
public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext)
|
||||
public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext):base(methodCallExpression,shardingDbContext)
|
||||
{
|
||||
_methodCallExpression = methodCallExpression;
|
||||
if (methodCallExpression.Arguments.Count < 1 || methodCallExpression.Arguments.Count > 2)
|
||||
|
@ -86,12 +87,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
|
|||
{
|
||||
var asyncExecuteQueryable = CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult);
|
||||
|
||||
return Task.Run(async () =>
|
||||
{
|
||||
var queryResult = await efQuery(asyncExecuteQueryable);
|
||||
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
|
||||
|
||||
}, cancellationToken);
|
||||
return AsyncParallelResultExecuteAsync(asyncExecuteQueryable,dataSourceName,routeResult,efQuery, cancellationToken);
|
||||
|
||||
});
|
||||
}).ToArray();
|
||||
|
@ -99,6 +95,28 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
|
|||
return (await Task.WhenAll(enumeratorTasks)).ToList();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 异步并发查询
|
||||
/// </summary>
|
||||
/// <param name="queryable"></param>
|
||||
/// <param name="efQuery"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <typeparam name="TResult"></typeparam>
|
||||
/// <returns></returns>
|
||||
public override async Task<TResult> AsyncParallelResultExecuteAsync0<TResult>(IQueryable queryable, Func<IQueryable, Task<TResult>> efQuery,
|
||||
CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
var queryResult = await efQuery(queryable);
|
||||
return queryResult;
|
||||
}
|
||||
|
||||
public override Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumeratorExecuteAsync0(IQueryable<TEntity> queryable, bool async,
|
||||
CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
|
||||
public virtual IQueryable DoCombineQueryable<TResult>(IQueryable<TEntity> queryable)
|
||||
{
|
||||
return queryable;
|
||||
|
|
|
@ -22,12 +22,12 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
|
|||
* @Ver: 1.0
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
public abstract class AbstractEnumeratorStreamMergeEngine<TEntity> : IEnumeratorStreamMergeEngine<TEntity>
|
||||
public abstract class AbstractEnumeratorStreamMergeEngine<TEntity> : AbstractBaseMergeEngine<TEntity>, IEnumeratorStreamMergeEngine<TEntity>
|
||||
{
|
||||
public StreamMergeContext<TEntity> StreamMergeContext { get; }
|
||||
|
||||
|
||||
public AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
|
||||
public AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext):base(streamMergeContext)
|
||||
{
|
||||
StreamMergeContext = streamMergeContext;
|
||||
}
|
||||
|
@ -92,6 +92,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
|
|||
/// <param name="streamsAsyncEnumerators"></param>
|
||||
/// <returns></returns>
|
||||
public abstract IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators);
|
||||
|
||||
/// <summary>
|
||||
/// 开启异步线程获取并发迭代器
|
||||
/// </summary>
|
||||
|
@ -99,22 +100,27 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
|
|||
/// <param name="async"></param>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns></returns>
|
||||
public Task<StreamMergeAsyncEnumerator<TEntity>> AsyncParallelQueryEnumerator(IQueryable<TEntity> queryable, bool async,CancellationToken cancellationToken=new CancellationToken())
|
||||
public override async Task<IStreamMergeAsyncEnumerator<TEntity>> AsyncParallelEnumeratorExecuteAsync0(IQueryable<TEntity> queryable, bool async,
|
||||
CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return Task.Run(async () =>
|
||||
if (async)
|
||||
{
|
||||
if (async)
|
||||
{
|
||||
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
|
||||
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
|
||||
}
|
||||
else
|
||||
{
|
||||
var enumerator = GetEnumerator0(queryable);
|
||||
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
|
||||
}
|
||||
}, cancellationToken);
|
||||
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
|
||||
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
|
||||
}
|
||||
else
|
||||
{
|
||||
var enumerator = GetEnumerator0(queryable);
|
||||
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
|
||||
}
|
||||
}
|
||||
|
||||
public override Task<TResult> AsyncParallelResultExecuteAsync0<TResult>(IQueryable queryable, Func<IQueryable, Task<TResult>> efQuery,
|
||||
CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取异步迭代器
|
||||
/// </summary>
|
||||
|
|
|
@ -105,7 +105,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
|
|||
var enumeratorTasks = sequenceResults.Select(sequenceResult =>
|
||||
{
|
||||
var newQueryable = CreateAsyncExecuteQueryable(sequenceResult.DSName, noPaginationQueryable, sequenceResult, reSetOrders);
|
||||
return AsyncParallelQueryEnumerator(newQueryable, async, cancellationToken);
|
||||
return AsyncParallelEnumeratorExecuteAsync(newQueryable, async, cancellationToken);
|
||||
}).ToArray();
|
||||
|
||||
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();
|
||||
|
|
|
@ -35,7 +35,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.Enumer
|
|||
return tableRouteResults.Select(routeResult =>
|
||||
{
|
||||
var newQueryable = CreateAsyncExecuteQueryable(dataSourceName, routeResult);
|
||||
return AsyncParallelQueryEnumerator(newQueryable, async,cancellationToken);
|
||||
return AsyncParallelEnumeratorExecuteAsync(newQueryable, async,cancellationToken);
|
||||
});
|
||||
|
||||
}).ToArray();
|
||||
|
|
|
@ -51,7 +51,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
|
|||
return StreamMergeContext.TableRouteResults.Select(routeResult =>
|
||||
{
|
||||
var newQueryable = CreateAsyncExecuteQueryable(dataSourceName, reverseOrderQueryable, routeResult);
|
||||
return AsyncParallelQueryEnumerator(newQueryable, async,cancellationToken);
|
||||
return AsyncParallelEnumeratorExecuteAsync(newQueryable, async,cancellationToken);
|
||||
});
|
||||
}).ToArray();;
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
|
|||
if (dataSourceOrderMain)
|
||||
{
|
||||
//是否有两级排序
|
||||
var useThenBy = dataSourceOrderMain && _tableSequenceMatchOrderConfig != null;
|
||||
var useThenBy = _tableSequenceMatchOrderConfig != null;
|
||||
if (_isAsc)
|
||||
{
|
||||
sortRouteResults = sortRouteResults.OrderBy(o => o.DataSourceName,
|
||||
|
@ -92,7 +92,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
|
|||
var enumeratorTasks = sequenceResults.Select(sequenceResult =>
|
||||
{
|
||||
var newQueryable = CreateAsyncExecuteQueryable(sequenceResult.DSName, noPaginationQueryable, sequenceResult);
|
||||
return AsyncParallelQueryEnumerator(newQueryable, async,cancellationToken);
|
||||
return AsyncParallelEnumeratorExecuteAsync(newQueryable, async,cancellationToken);
|
||||
}).ToArray();
|
||||
|
||||
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
using System.Threading;
|
||||
|
||||
namespace ShardingCore.Sharding.MergeEngines.ParallelControl
|
||||
{
|
||||
/*
|
||||
* @Author: xjm
|
||||
* @Description:
|
||||
* @Date: Tuesday, 17 November 2020 16:23:53
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
public class DoOnlyOnce
|
||||
{
|
||||
|
||||
private const int Did = 1;
|
||||
private const int UnDo = 0;
|
||||
private int Status = UnDo;
|
||||
|
||||
public bool IsUnDo()
|
||||
{
|
||||
return Interlocked.CompareExchange(ref Status, Did, UnDo) == UnDo;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
using System.Threading;
|
||||
|
||||
namespace ShardingCore.Sharding.MergeEngines.ParallelControl
|
||||
{
|
||||
/*
|
||||
* @Author: xjm
|
||||
* @Description:
|
||||
* @Date: Tuesday, 17 November 2020 13:43:39
|
||||
* @Email: 326308290@qq.com
|
||||
*/
|
||||
public class SemaphoreReleaseOnlyOnce
|
||||
{
|
||||
private readonly DoOnlyOnce _doOnlyOnce=new DoOnlyOnce();
|
||||
private readonly SemaphoreSlim _semaphore;
|
||||
|
||||
public SemaphoreReleaseOnlyOnce(SemaphoreSlim semaphore)
|
||||
{
|
||||
_semaphore = semaphore;
|
||||
}
|
||||
|
||||
public void Release()
|
||||
{
|
||||
if (_semaphore != null)
|
||||
{
|
||||
if (_doOnlyOnce.IsUnDo())
|
||||
{
|
||||
_semaphore.Release();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -39,7 +39,6 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
|||
private readonly IShardingDbContextOptionsBuilderConfig _shardingDbContextOptionsBuilderConfig;
|
||||
private readonly IRouteTailFactory _routeTailFactory;
|
||||
private readonly ActualConnectionStringManager<TShardingDbContext> _actualConnectionStringManager;
|
||||
private readonly IShardingConfigOption _shardingConfigOption;
|
||||
|
||||
public int ReadWriteSeparationPriority
|
||||
{
|
||||
|
@ -65,8 +64,6 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
|||
|
||||
_routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
|
||||
_actualConnectionStringManager = new ActualConnectionStringManager<TShardingDbContext>();
|
||||
|
||||
_shardingConfigOption = ShardingContainer.GetServices<IShardingConfigOption>().FirstOrDefault(o => o.ShardingDbContextType == typeof(TShardingDbContext));
|
||||
}
|
||||
|
||||
#region create db context
|
||||
|
@ -125,11 +122,6 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
|
|||
return _actualConnectionStringManager.IsUseReadWriteSeparation();
|
||||
}
|
||||
|
||||
public bool EnableAutoTrack()
|
||||
{
|
||||
return _shardingConfigOption.AutoTrackEntity;
|
||||
}
|
||||
|
||||
public DbContext CreateDbContext(bool parallelQuery, string dataSourceName, IRouteTail routeTail)
|
||||
{
|
||||
|
||||
|
|
|
@ -64,6 +64,7 @@ namespace ShardingCore.Sharding
|
|||
public bool IsCrossTable { get; }
|
||||
|
||||
private readonly ITrackerManager _trackerManager;
|
||||
private readonly IShardingConfigOption _shardingConfigOption;
|
||||
|
||||
private readonly ConcurrentDictionary<DbContext, object> _parallelDbContexts;
|
||||
|
||||
|
@ -92,6 +93,8 @@ namespace ShardingCore.Sharding
|
|||
_trackerManager =
|
||||
(ITrackerManager)ShardingContainer.GetService(
|
||||
typeof(ITrackerManager<>).GetGenericType0(shardingDbContext.GetType()));
|
||||
_shardingConfigOption = ShardingContainer.GetServices<IShardingConfigOption>()
|
||||
.FirstOrDefault(o => o.ShardingDbContextType == shardingDbContext.GetType());
|
||||
_parallelDbContexts = new ConcurrentDictionary<DbContext, object>();
|
||||
//RouteResults = _tableTableRouteRuleEngineFactory.Route(_shardingDbContext.ShardingDbContextType, _source);
|
||||
}
|
||||
|
@ -177,6 +180,15 @@ namespace ShardingCore.Sharding
|
|||
{
|
||||
return _shardingDbContext;
|
||||
}
|
||||
|
||||
public int GetParallelQueryMaxThreadCount()
|
||||
{
|
||||
return _shardingConfigOption.ParallelQueryMaxThreadCount;
|
||||
}
|
||||
public TimeSpan GetParallelQueryTimeOut()
|
||||
{
|
||||
return _shardingConfigOption.ParallelQueryTimeOut;
|
||||
}
|
||||
/// <summary>
|
||||
/// ÊÇ·ñÊÇ¿ç×ÊÔ´²éѯ
|
||||
/// </summary>
|
||||
|
@ -197,7 +209,7 @@ namespace ShardingCore.Sharding
|
|||
/// <returns></returns>
|
||||
private bool IsParallelQuery()
|
||||
{
|
||||
return !_shardingDbContext.EnableAutoTrack()|| IsCrossQuery() || IsUseReadWriteSeparation();
|
||||
return !_shardingConfigOption.AutoTrackEntity|| IsCrossQuery() || IsUseReadWriteSeparation();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
|
@ -178,6 +178,15 @@ namespace ShardingCore
|
|||
/// </summary>
|
||||
public bool AutoTrackEntity { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 单次查询并发线程数目(-1表示不限制)
|
||||
/// </summary>
|
||||
public int ParallelQueryMaxThreadCount { get; set; } = 65536;
|
||||
/// <summary>
|
||||
/// 默认30秒超时
|
||||
/// </summary>
|
||||
public TimeSpan ParallelQueryTimeOut { get; set; }=TimeSpan.FromSeconds(30);
|
||||
|
||||
public string DefaultDataSourceName { get; set; }
|
||||
public string DefaultConnectionString { get; set; }
|
||||
}
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<PropertyGroup>
|
||||
<TargetFramework>netstandard2.1</TargetFramework>
|
||||
<Version>$(EFCORE5)</Version>
|
||||
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
|
||||
<DefineConstants>TRACE;DEBUG;EFCORE5;</DefineConstants>
|
||||
<LangVersion>8.0</LangVersion>
|
||||
</PropertyGroup>
|
||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
|
||||
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
|
||||
<DocumentationFile>bin\Release\ShardingCore.xml</DocumentationFile>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="5.0.10" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="5.0.10" />
|
||||
</ItemGroup>
|
||||
</Project>
|
Loading…
Reference in New Issue