修复同步线程上下文导致deadlock的bug

This commit is contained in:
xuejiaming 2022-10-14 23:26:20 +08:00
parent 01b485e497
commit ffb6d3ee20
13 changed files with 30 additions and 39 deletions

View File

@ -13,42 +13,31 @@ using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
namespace ShardingCore.Sharding.MergeEngines.ShardingExecutors
{
internal class ShardingExecutor
internal static class ShardingExecutor
{
private static readonly ShardingExecutor _instance;
private ShardingExecutor()
{
}
static ShardingExecutor()
{
_instance = new ShardingExecutor();
}
public static ShardingExecutor Instance => _instance;
public TResult Execute<TResult>(StreamMergeContext streamMergeContext,
public static TResult Execute<TResult>(StreamMergeContext streamMergeContext,
IExecutor<TResult> executor, bool async, IEnumerable<ISqlRouteUnit> sqlRouteUnits,
CancellationToken cancellationToken = new CancellationToken())
{
return ExecuteAsync<TResult>(streamMergeContext, executor, async, sqlRouteUnits, cancellationToken)
.WaitAndUnwrapException();
return ExecuteAsync(streamMergeContext, executor,async,sqlRouteUnits,cancellationToken).WaitAndUnwrapException(false);
}
public async Task<TResult> ExecuteAsync<TResult>(StreamMergeContext streamMergeContext,
public static async Task<TResult> ExecuteAsync<TResult>(StreamMergeContext streamMergeContext,
IExecutor<TResult> executor, bool async, IEnumerable<ISqlRouteUnit> sqlRouteUnits,
CancellationToken cancellationToken = new CancellationToken())
{
var resultGroups =
Execute0<TResult>(streamMergeContext, executor, async, sqlRouteUnits, cancellationToken);
var results =(await TaskHelper.WhenAllFastFail(resultGroups)).SelectMany(o => o)
Execute0<TResult>(streamMergeContext, executor, async, sqlRouteUnits, cancellationToken).ToArray();
var results = (await TaskHelper.WhenAllFastFail(resultGroups).ConfigureAwait(false)).SelectMany(o => o)
.ToList();
if (results.IsEmpty())
throw new ShardingCoreException("sharding execute result empty");
return executor.GetShardingMerger().StreamMerge(results);
var streamMerge = executor.GetShardingMerger().StreamMerge(results);
return streamMerge;
}
private Task<List<TResult>>[] Execute0<TResult>(StreamMergeContext streamMergeContext,
private static Task<List<TResult>>[] Execute0<TResult>(StreamMergeContext streamMergeContext,
IExecutor<TResult> executor, bool async, IEnumerable<ISqlRouteUnit> sqlRouteUnits,
CancellationToken cancellationToken = new CancellationToken())
{
@ -87,7 +76,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingExecutors
/// <param name="streamMergeContext"></param>
/// <param name="sqlRouteUnits"></param>
/// <returns></returns>
private IEnumerable<ISqlRouteUnit> ReOrderTableTails(StreamMergeContext streamMergeContext,
private static IEnumerable<ISqlRouteUnit> ReOrderTableTails(StreamMergeContext streamMergeContext,
IEnumerable<ISqlRouteUnit> sqlRouteUnits)
{
if (streamMergeContext.IsSeqQuery())
@ -106,7 +95,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingExecutors
/// <param name="streamMergeContext"></param>
/// <param name="sqlGroups"></param>
/// <returns></returns>
protected DataSourceSqlExecutorUnit GetSqlExecutorGroups(StreamMergeContext streamMergeContext,
private static DataSourceSqlExecutorUnit GetSqlExecutorGroups(StreamMergeContext streamMergeContext,
IGrouping<string, ISqlRouteUnit> sqlGroups)
{
var maxQueryConnectionsLimit = streamMergeContext.GetMaxQueryConnectionsLimit();

View File

@ -23,7 +23,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.I
protected abstract IExecutor<TResult> CreateExecutor();
public virtual TResult MergeResult()
{
return MergeResultAsync().WaitAndUnwrapException();
return MergeResultAsync().WaitAndUnwrapException(false);
}
public virtual async Task<TResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
@ -35,7 +35,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.I
}
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var executor = CreateExecutor();
var result =await ShardingExecutor.Instance.ExecuteAsync<TResult>(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken);
var result =await ShardingExecutor.ExecuteAsync<TResult>(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken).ConfigureAwait(false);
return result;
}
}

View File

@ -17,7 +17,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.I
protected abstract IExecutor<RouteQueryResult<TResult>> CreateExecutor();
public virtual TResult MergeResult()
{
return MergeResultAsync().WaitAndUnwrapException();
return MergeResultAsync().WaitAndUnwrapException(false);
}
public async Task<TResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
@ -29,7 +29,8 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.I
}
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var executor = CreateExecutor();
var result =await ShardingExecutor.Instance.ExecuteAsync<RouteQueryResult<TResult>>(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken);
var result =await ShardingExecutor.ExecuteAsync<RouteQueryResult<TResult>>(GetStreamMergeContext(),
executor, true, defaultSqlRouteUnits, cancellationToken).ConfigureAwait(false);
return result.QueryResult;
}
}

View File

@ -71,7 +71,8 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.S
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var executor = CreateExecutor(async);
return ShardingExecutor.Instance.Execute<IStreamMergeAsyncEnumerator<TEntity>>(GetStreamMergeContext(),executor,async,defaultSqlRouteUnits,cancellationToken);
return ShardingExecutor.Execute<IStreamMergeAsyncEnumerator<TEntity>>(GetStreamMergeContext(), executor,
async, defaultSqlRouteUnits, cancellationToken);
}
// public abstract IShardingExecutor GetShardingExecutor();

View File

@ -34,7 +34,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
public TResult MergeResult()
{
return MergeResultAsync().WaitAndUnwrapException();
return MergeResultAsync().WaitAndUnwrapException(false);
}
public async Task<TResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
@ -46,7 +46,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
}
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var executor = CreateExecutor();
var result =await ShardingExecutor.Instance.ExecuteAsync(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken);
var result =await ShardingExecutor.ExecuteAsync(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken).ConfigureAwait(false);
var sum = result.QueryResult.Sum;
var count = result.QueryResult.Count;
// var resultList = await base.ExecuteAsync<AverageResult<TSelect>>(cancellationToken);

View File

@ -37,7 +37,7 @@ namespace ShardingCore.Sharding.MergeEngines
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var take = _streamMergeContext.GetTake();
var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken);
var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken).ConfigureAwait(false);
return list.FirstOrDefault();
}

View File

@ -51,7 +51,7 @@ namespace ShardingCore.Sharding.MergeEngines
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var take = _streamMergeContext.GetTake();
var list =await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take,cancellationToken);
var list =await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take,cancellationToken).ConfigureAwait(false);
return list.First();
}

View File

@ -62,7 +62,7 @@ namespace ShardingCore.Sharding.MergeEngines
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var maxVirtualElementCount = skip.GetValueOrDefault() + 1;
var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1,maxVirtualElementCount, cancellationToken);
var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1,maxVirtualElementCount, cancellationToken).ConfigureAwait(false);
if (list.VirtualElementCount >= maxVirtualElementCount)
return list.FirstOrDefault();
return default;

View File

@ -62,7 +62,7 @@ namespace ShardingCore.Sharding.MergeEngines
//将toke改成1
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var maxVirtualElementCount = skip.GetValueOrDefault() + 1;
var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1,maxVirtualElementCount, cancellationToken);
var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1,maxVirtualElementCount, cancellationToken).ConfigureAwait(false);
if (list.VirtualElementCount >= maxVirtualElementCount)
return list.First();

View File

@ -33,7 +33,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
public TResult MergeResult()
{
return MergeResultAsync().WaitAndUnwrapException();
return MergeResultAsync().WaitAndUnwrapException(false);
}
@ -95,7 +95,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
}
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var executor = CreateExecutor<TR>();
var result = await ShardingExecutor.Instance.ExecuteAsync<RouteQueryResult<TR>>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken);
var result = await ShardingExecutor.ExecuteAsync<RouteQueryResult<TR>>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken).ConfigureAwait(false);
return result.QueryResult;
}
protected IExecutor<RouteQueryResult<TR>> CreateExecutor<TR>()

View File

@ -33,7 +33,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
public TResult MergeResult()
{
return MergeResultAsync().WaitAndUnwrapException();
return MergeResultAsync().WaitAndUnwrapException(false);
}
@ -95,7 +95,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
}
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits();
var executor = CreateExecutor<TR>();
var result = await ShardingExecutor.Instance.ExecuteAsync<RouteQueryResult<TR>>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken);
var result = await ShardingExecutor.ExecuteAsync<RouteQueryResult<TR>>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken).ConfigureAwait(false);
return result.QueryResult;
}

View File

@ -51,7 +51,7 @@ namespace ShardingCore.Sharding.MergeEngines
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var take = _streamMergeContext.GetTake();
var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken);
var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken).ConfigureAwait(false);
return list.SingleOrDefault();
}

View File

@ -50,7 +50,7 @@ namespace ShardingCore.Sharding.MergeEngines
var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine<TEntity>(_streamMergeContext);
var take = _streamMergeContext.GetTake();
var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken);
var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken).ConfigureAwait(false);
return list.Single();
}