From d91dd52c5e383af92fde80fa7866c3312422edc0 Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Tue, 30 Nov 2021 09:53:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=B9=B6=E5=8F=91=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E6=8E=A7=E5=88=B6=E4=BD=BF=E7=94=A8=E5=BC=82=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- samples/Sample.SqlServer/Startup.cs | 8 ++---- .../Abstractions/AbstractBaseMergeEngine.cs | 26 ++++++++----------- .../Abstractions/IAsyncParallelLimit.cs | 2 +- 3 files changed, 14 insertions(+), 22 deletions(-) diff --git a/samples/Sample.SqlServer/Startup.cs b/samples/Sample.SqlServer/Startup.cs index 46030490..b253a8b6 100644 --- a/samples/Sample.SqlServer/Startup.cs +++ b/samples/Sample.SqlServer/Startup.cs @@ -7,13 +7,9 @@ using Microsoft.Extensions.Logging; using Sample.SqlServer.DbContexts; using Sample.SqlServer.Shardings; using ShardingCore; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using Sample.SqlServer.Domain.Entities; -using ShardingCore.Extensions; using ShardingCore.Sharding.ReadWriteConfigurations; -using ShardingCore.Sharding.ShardingComparision; +using System; +using System.Collections.Generic; namespace Sample.SqlServer { diff --git a/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs index 503abf8a..b61adf34 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/AbstractBaseMergeEngine.cs @@ -38,6 +38,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions _semaphore = new SemaphoreSlim(Math.Max(1, streamMergeContext.GetParallelQueryMaxThreadCount())); _parallelQueryTimeOut = streamMergeContext.GetParallelQueryTimeOut(); } + /// /// 异步多线程控制并发 /// @@ -45,31 +46,21 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions /// /// /// - public Task AsyncParallelLimitExecuteAsync(Func> executeAsync,CancellationToken cancellationToken=new CancellationToken()) + public async Task AsyncParallelLimitExecuteAsync(Func> executeAsync,CancellationToken cancellationToken=new CancellationToken()) { + cancellationToken.ThrowIfCancellationRequested(); var parallelTimeOut = _parallelQueryTimeOut.TotalMilliseconds; - var acquired = this._semaphore.Wait((int)parallelTimeOut); + var acquired = await this._semaphore.WaitAsync((int)parallelTimeOut, cancellationToken); if (acquired) { var once = new SemaphoreReleaseOnlyOnce(this._semaphore); try { - return Task.Run(async () => - { - try - { - return await executeAsync(); - } - finally - { - once.Release(); - } - }, cancellationToken); + return await executeAsync(); } - catch (Exception e) + finally { once.Release(); - throw e; } } else @@ -77,5 +68,10 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions throw new ShardingCoreParallelQueryTimeOutException(_executeExpression.ShardingPrint()); } } + + public void Dispose() + { + _semaphore?.Dispose(); + } } } diff --git a/src/ShardingCore/Sharding/MergeEngines/Abstractions/IAsyncParallelLimit.cs b/src/ShardingCore/Sharding/MergeEngines/Abstractions/IAsyncParallelLimit.cs index 81b91859..dd3d9ba3 100644 --- a/src/ShardingCore/Sharding/MergeEngines/Abstractions/IAsyncParallelLimit.cs +++ b/src/ShardingCore/Sharding/MergeEngines/Abstractions/IAsyncParallelLimit.cs @@ -13,7 +13,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions * @Ver: 1.0 * @Email: 326308290@qq.com */ - internal interface IAsyncParallelLimit + internal interface IAsyncParallelLimit:IDisposable { Task AsyncParallelLimitExecuteAsync(Func> executeAsync, CancellationToken cancellationToken = new CancellationToken());