diff --git a/samples/Sample.SqlServer/Startup.cs b/samples/Sample.SqlServer/Startup.cs index 46030490..b253a8b6 100644 --- a/samples/Sample.SqlServer/Startup.cs +++ b/samples/Sample.SqlServer/Startup.cs @@ -7,13 +7,9 @@ using Microsoft.Extensions.Logging; using Sample.SqlServer.DbContexts; using Sample.SqlServer.Shardings; using ShardingCore; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using Sample.SqlServer.Domain.Entities; -using ShardingCore.Extensions; using ShardingCore.Sharding.ReadWriteConfigurations; -using ShardingCore.Sharding.ShardingComparision; +using System; +using System.Collections.Generic; namespace Sample.SqlServer { diff --git a/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs index 503abf8a..b61adf34 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs @@ -38,6 +38,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions _semaphore = new SemaphoreSlim(Math.Max(1, streamMergeContext.GetParallelQueryMaxThreadCount())); _parallelQueryTimeOut = streamMergeContext.GetParallelQueryTimeOut(); } + /// /// 异步多线程控制并发 /// @@ -45,31 +46,21 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions /// /// /// - public Task AsyncParallelLimitExecuteAsync(Func> executeAsync,CancellationToken cancellationToken=new CancellationToken()) + public async Task AsyncParallelLimitExecuteAsync(Func> executeAsync,CancellationToken cancellationToken=new CancellationToken()) { + cancellationToken.ThrowIfCancellationRequested(); var parallelTimeOut = _parallelQueryTimeOut.TotalMilliseconds; - var acquired = this._semaphore.Wait((int)parallelTimeOut); + var acquired = await this._semaphore.WaitAsync((int)parallelTimeOut, cancellationToken); if (acquired) { var once = new SemaphoreReleaseOnlyOnce(this._semaphore); try { - return Task.Run(async () => - { - try - { - return await executeAsync(); - } - finally - { - once.Release(); - } - }, cancellationToken); + return await executeAsync(); } - catch (Exception e) + finally { once.Release(); - throw e; } } else @@ -77,5 +68,10 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions throw new ShardingCoreParallelQueryTimeOutException(_executeExpression.ShardingPrint()); } } + + public void Dispose() + { + _semaphore?.Dispose(); + } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/Abstractions/IAsyncParallelLimit.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/IAsyncParallelLimit.cs index 81b91859..dd3d9ba3 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Abstractions/IAsyncParallelLimit.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/IAsyncParallelLimit.cs @@ -13,7 +13,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions * @Ver: 1.0 * @Email: 326308290@qq.com */ - internal interface IAsyncParallelLimit + internal interface IAsyncParallelLimit:IDisposable { Task AsyncParallelLimitExecuteAsync(Func> executeAsync, CancellationToken cancellationToken = new CancellationToken());