优化并发查询控制使用异步
This commit is contained in:
parent
a317ceed0d
commit
d91dd52c5e
|
@ -7,13 +7,9 @@ using Microsoft.Extensions.Logging;
|
||||||
using Sample.SqlServer.DbContexts;
|
using Sample.SqlServer.DbContexts;
|
||||||
using Sample.SqlServer.Shardings;
|
using Sample.SqlServer.Shardings;
|
||||||
using ShardingCore;
|
using ShardingCore;
|
||||||
using System;
|
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using Sample.SqlServer.Domain.Entities;
|
|
||||||
using ShardingCore.Extensions;
|
|
||||||
using ShardingCore.Sharding.ReadWriteConfigurations;
|
using ShardingCore.Sharding.ReadWriteConfigurations;
|
||||||
using ShardingCore.Sharding.ShardingComparision;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
namespace Sample.SqlServer
|
namespace Sample.SqlServer
|
||||||
{
|
{
|
||||||
|
|
|
@ -38,6 +38,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
|
||||||
_semaphore = new SemaphoreSlim(Math.Max(1, streamMergeContext.GetParallelQueryMaxThreadCount()));
|
_semaphore = new SemaphoreSlim(Math.Max(1, streamMergeContext.GetParallelQueryMaxThreadCount()));
|
||||||
_parallelQueryTimeOut = streamMergeContext.GetParallelQueryTimeOut();
|
_parallelQueryTimeOut = streamMergeContext.GetParallelQueryTimeOut();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 异步多线程控制并发
|
/// 异步多线程控制并发
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
@ -45,31 +46,21 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
|
||||||
/// <param name="executeAsync"></param>
|
/// <param name="executeAsync"></param>
|
||||||
/// <param name="cancellationToken"></param>
|
/// <param name="cancellationToken"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,CancellationToken cancellationToken=new CancellationToken())
|
public async Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,CancellationToken cancellationToken=new CancellationToken())
|
||||||
{
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
var parallelTimeOut = _parallelQueryTimeOut.TotalMilliseconds;
|
var parallelTimeOut = _parallelQueryTimeOut.TotalMilliseconds;
|
||||||
var acquired = this._semaphore.Wait((int)parallelTimeOut);
|
var acquired = await this._semaphore.WaitAsync((int)parallelTimeOut, cancellationToken);
|
||||||
if (acquired)
|
if (acquired)
|
||||||
{
|
{
|
||||||
var once = new SemaphoreReleaseOnlyOnce(this._semaphore);
|
var once = new SemaphoreReleaseOnlyOnce(this._semaphore);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
return Task.Run(async () =>
|
return await executeAsync();
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
return await executeAsync();
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
once.Release();
|
|
||||||
}
|
|
||||||
}, cancellationToken);
|
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
finally
|
||||||
{
|
{
|
||||||
once.Release();
|
once.Release();
|
||||||
throw e;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -77,5 +68,10 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
|
||||||
throw new ShardingCoreParallelQueryTimeOutException(_executeExpression.ShardingPrint());
|
throw new ShardingCoreParallelQueryTimeOutException(_executeExpression.ShardingPrint());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
_semaphore?.Dispose();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
|
||||||
* @Ver: 1.0
|
* @Ver: 1.0
|
||||||
* @Email: 326308290@qq.com
|
* @Email: 326308290@qq.com
|
||||||
*/
|
*/
|
||||||
internal interface IAsyncParallelLimit
|
internal interface IAsyncParallelLimit:IDisposable
|
||||||
{
|
{
|
||||||
Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,
|
Task<TResult> AsyncParallelLimitExecuteAsync<TResult>(Func<Task<TResult>> executeAsync,
|
||||||
CancellationToken cancellationToken = new CancellationToken());
|
CancellationToken cancellationToken = new CancellationToken());
|
||||||
|
|
Loading…
Reference in New Issue