diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingExecutors/ShardingExecutor.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingExecutors/ShardingExecutor.cs index e88f1d59..374922e2 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingExecutors/ShardingExecutor.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingExecutors/ShardingExecutor.cs @@ -13,42 +13,31 @@ using ShardingCore.Sharding.MergeEngines.Executors.Abstractions; namespace ShardingCore.Sharding.MergeEngines.ShardingExecutors { - internal class ShardingExecutor + internal static class ShardingExecutor { - private static readonly ShardingExecutor _instance; - private ShardingExecutor() - { - } - static ShardingExecutor() - { - _instance = new ShardingExecutor(); - } - - public static ShardingExecutor Instance => _instance; - - public TResult Execute(StreamMergeContext streamMergeContext, + public static TResult Execute(StreamMergeContext streamMergeContext, IExecutor executor, bool async, IEnumerable sqlRouteUnits, CancellationToken cancellationToken = new CancellationToken()) { - return ExecuteAsync(streamMergeContext, executor, async, sqlRouteUnits, cancellationToken) - .WaitAndUnwrapException(); + return ExecuteAsync(streamMergeContext, executor,async,sqlRouteUnits,cancellationToken).WaitAndUnwrapException(false); } - public async Task ExecuteAsync(StreamMergeContext streamMergeContext, + public static async Task ExecuteAsync(StreamMergeContext streamMergeContext, IExecutor executor, bool async, IEnumerable sqlRouteUnits, CancellationToken cancellationToken = new CancellationToken()) { var resultGroups = - Execute0(streamMergeContext, executor, async, sqlRouteUnits, cancellationToken); - var results =(await TaskHelper.WhenAllFastFail(resultGroups)).SelectMany(o => o) + Execute0(streamMergeContext, executor, async, sqlRouteUnits, cancellationToken).ToArray(); + var results = (await TaskHelper.WhenAllFastFail(resultGroups).ConfigureAwait(false)).SelectMany(o => o) .ToList(); if (results.IsEmpty()) throw new ShardingCoreException("sharding execute result empty"); - return executor.GetShardingMerger().StreamMerge(results); + var streamMerge = executor.GetShardingMerger().StreamMerge(results); + return streamMerge; } - private Task>[] Execute0(StreamMergeContext streamMergeContext, + private static Task>[] Execute0(StreamMergeContext streamMergeContext, IExecutor executor, bool async, IEnumerable sqlRouteUnits, CancellationToken cancellationToken = new CancellationToken()) { @@ -87,7 +76,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingExecutors /// /// /// - private IEnumerable ReOrderTableTails(StreamMergeContext streamMergeContext, + private static IEnumerable ReOrderTableTails(StreamMergeContext streamMergeContext, IEnumerable sqlRouteUnits) { if (streamMergeContext.IsSeqQuery()) @@ -106,7 +95,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingExecutors /// /// /// - protected DataSourceSqlExecutorUnit GetSqlExecutorGroups(StreamMergeContext streamMergeContext, + private static DataSourceSqlExecutorUnit GetSqlExecutorGroups(StreamMergeContext streamMergeContext, IGrouping sqlGroups) { var maxQueryConnectionsLimit = streamMergeContext.GetMaxQueryConnectionsLimit(); diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureMergeEngine.cs index 79d30480..a4b25816 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureMergeEngine.cs @@ -23,7 +23,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.I protected abstract IExecutor CreateExecutor(); public virtual TResult MergeResult() { - return MergeResultAsync().WaitAndUnwrapException(); + return MergeResultAsync().WaitAndUnwrapException(false); } public virtual async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) @@ -35,7 +35,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.I } var defaultSqlRouteUnits = GetDefaultSqlRouteUnits(); var executor = CreateExecutor(); - var result =await ShardingExecutor.Instance.ExecuteAsync(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken); + var result =await ShardingExecutor.ExecuteAsync(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken).ConfigureAwait(false); return result; } } diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureWrapMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureWrapMergeEngine.cs index ec16d236..6580dbb2 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureWrapMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/InMemoryMerge/AbstractMethodEnsureWrapMergeEngine.cs @@ -17,7 +17,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.I protected abstract IExecutor> CreateExecutor(); public virtual TResult MergeResult() { - return MergeResultAsync().WaitAndUnwrapException(); + return MergeResultAsync().WaitAndUnwrapException(false); } public async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) @@ -29,7 +29,8 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.I } var defaultSqlRouteUnits = GetDefaultSqlRouteUnits(); var executor = CreateExecutor(); - var result =await ShardingExecutor.Instance.ExecuteAsync>(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken); + var result =await ShardingExecutor.ExecuteAsync>(GetStreamMergeContext(), + executor, true, defaultSqlRouteUnits, cancellationToken).ConfigureAwait(false); return result.QueryResult; } } diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/StreamMerge/AbstractStreamEnumerable.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/StreamMerge/AbstractStreamEnumerable.cs index 6ca5881a..44565e92 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/StreamMerge/AbstractStreamEnumerable.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/Abstractions/StreamMerge/AbstractStreamEnumerable.cs @@ -71,7 +71,8 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.S var defaultSqlRouteUnits = GetDefaultSqlRouteUnits(); var executor = CreateExecutor(async); - return ShardingExecutor.Instance.Execute>(GetStreamMergeContext(),executor,async,defaultSqlRouteUnits,cancellationToken); + return ShardingExecutor.Execute>(GetStreamMergeContext(), executor, + async, defaultSqlRouteUnits, cancellationToken); } // public abstract IShardingExecutor GetShardingExecutor(); diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/AverageAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/AverageAsyncInMemoryMergeEngine.cs index dc971bfe..a807e972 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/AverageAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/AverageAsyncInMemoryMergeEngine.cs @@ -34,7 +34,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines public TResult MergeResult() { - return MergeResultAsync().WaitAndUnwrapException(); + return MergeResultAsync().WaitAndUnwrapException(false); } public async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) @@ -46,7 +46,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines } var defaultSqlRouteUnits = GetDefaultSqlRouteUnits(); var executor = CreateExecutor(); - var result =await ShardingExecutor.Instance.ExecuteAsync(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken); + var result =await ShardingExecutor.ExecuteAsync(GetStreamMergeContext(),executor,true,defaultSqlRouteUnits,cancellationToken).ConfigureAwait(false); var sum = result.QueryResult.Sum; var count = result.QueryResult.Count; // var resultList = await base.ExecuteAsync>(cancellationToken); diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/FirstOrDefaultSkipAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/FirstOrDefaultSkipAsyncInMemoryMergeEngine.cs index cfd45379..f5b90e60 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/FirstOrDefaultSkipAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/FirstOrDefaultSkipAsyncInMemoryMergeEngine.cs @@ -37,7 +37,7 @@ namespace ShardingCore.Sharding.MergeEngines var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine(_streamMergeContext); var take = _streamMergeContext.GetTake(); - var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken); + var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken).ConfigureAwait(false); return list.FirstOrDefault(); } diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/FirstSkipAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/FirstSkipAsyncInMemoryMergeEngine.cs index 4e6157d8..343e6aab 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/FirstSkipAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/FirstSkipAsyncInMemoryMergeEngine.cs @@ -51,7 +51,7 @@ namespace ShardingCore.Sharding.MergeEngines //将toke改成1 var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine(_streamMergeContext); var take = _streamMergeContext.GetTake(); - var list =await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take,cancellationToken); + var list =await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take,cancellationToken).ConfigureAwait(false); return list.First(); } diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/LastOrDefaultSkipAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/LastOrDefaultSkipAsyncInMemoryMergeEngine.cs index 82888f30..51ef8fe3 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/LastOrDefaultSkipAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/LastOrDefaultSkipAsyncInMemoryMergeEngine.cs @@ -62,7 +62,7 @@ namespace ShardingCore.Sharding.MergeEngines var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine(_streamMergeContext); var maxVirtualElementCount = skip.GetValueOrDefault() + 1; - var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1,maxVirtualElementCount, cancellationToken); + var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1,maxVirtualElementCount, cancellationToken).ConfigureAwait(false); if (list.VirtualElementCount >= maxVirtualElementCount) return list.FirstOrDefault(); return default; diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/LastSkipAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/LastSkipAsyncInMemoryMergeEngine.cs index 8f28a135..a835cae4 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/LastSkipAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/LastSkipAsyncInMemoryMergeEngine.cs @@ -62,7 +62,7 @@ namespace ShardingCore.Sharding.MergeEngines //将toke改成1 var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine(_streamMergeContext); var maxVirtualElementCount = skip.GetValueOrDefault() + 1; - var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1,maxVirtualElementCount, cancellationToken); + var list = await asyncEnumeratorStreamMergeEngine.ToFixedElementStreamListAsync(1,maxVirtualElementCount, cancellationToken).ConfigureAwait(false); if (list.VirtualElementCount >= maxVirtualElementCount) return list.First(); diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MaxAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MaxAsyncInMemoryMergeEngine.cs index c28c9a7c..c7673336 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MaxAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MaxAsyncInMemoryMergeEngine.cs @@ -33,7 +33,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines public TResult MergeResult() { - return MergeResultAsync().WaitAndUnwrapException(); + return MergeResultAsync().WaitAndUnwrapException(false); } @@ -95,7 +95,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines } var defaultSqlRouteUnits = GetDefaultSqlRouteUnits(); var executor = CreateExecutor(); - var result = await ShardingExecutor.Instance.ExecuteAsync>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken); + var result = await ShardingExecutor.ExecuteAsync>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken).ConfigureAwait(false); return result.QueryResult; } protected IExecutor> CreateExecutor() diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MinAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MinAsyncInMemoryMergeEngine.cs index f635f35a..a2da1651 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MinAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/MinAsyncInMemoryMergeEngine.cs @@ -33,7 +33,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines public TResult MergeResult() { - return MergeResultAsync().WaitAndUnwrapException(); + return MergeResultAsync().WaitAndUnwrapException(false); } @@ -95,7 +95,7 @@ namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines } var defaultSqlRouteUnits = GetDefaultSqlRouteUnits(); var executor = CreateExecutor(); - var result = await ShardingExecutor.Instance.ExecuteAsync>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken); + var result = await ShardingExecutor.ExecuteAsync>(GetStreamMergeContext(), executor, true, defaultSqlRouteUnits, cancellationToken).ConfigureAwait(false); return result.QueryResult; } diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SingleOrDefaultSkipAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SingleOrDefaultSkipAsyncInMemoryMergeEngine.cs index 07fdaf79..94aad2c6 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SingleOrDefaultSkipAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SingleOrDefaultSkipAsyncInMemoryMergeEngine.cs @@ -51,7 +51,7 @@ namespace ShardingCore.Sharding.MergeEngines var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine(_streamMergeContext); var take = _streamMergeContext.GetTake(); - var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken); + var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken).ConfigureAwait(false); return list.SingleOrDefault(); } diff --git a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SingleSkipAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SingleSkipAsyncInMemoryMergeEngine.cs index 39876efe..795705cc 100644 --- a/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SingleSkipAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/MergeEngines/ShardingMergeEngines/SingleSkipAsyncInMemoryMergeEngine.cs @@ -50,7 +50,7 @@ namespace ShardingCore.Sharding.MergeEngines var asyncEnumeratorStreamMergeEngine = new AsyncEnumeratorStreamMergeEngine(_streamMergeContext); var take = _streamMergeContext.GetTake(); - var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken); + var list = await asyncEnumeratorStreamMergeEngine.ToStreamListAsync(take, cancellationToken).ConfigureAwait(false); return list.Single(); }