From 85f854577de59c2ed996d7f942498f4717678236 Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Wed, 29 Sep 2021 14:42:50 +0800 Subject: [PATCH] =?UTF-8?q?efcore2=E4=B8=8B=E4=BF=AE=E5=A4=8Diasyncenumera?= =?UTF-8?q?ble=E7=9A=84current=E6=8A=A5=E9=94=99=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nuget-publish.bat | 6 +- .../EFCore2TryCurrentAsyncEnumerator.cs | 42 +++++++ .../StreamMergeAsyncEnumerator.cs | 107 +++++------------- ...bstractEnumeratorAsyncStreamMergeEngine.cs | 4 +- 4 files changed, 74 insertions(+), 85 deletions(-) create mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/EFCore2x/EFCore2TryCurrentAsyncEnumerator.cs diff --git a/nuget-publish.bat b/nuget-publish.bat index 85d1f74a..99f4c35c 100644 --- a/nuget-publish.bat +++ b/nuget-publish.bat @@ -1,8 +1,8 @@ :start ::定义版本 -set EFCORE2=2.2.0.21 -set EFCORE3=3.2.0.21 -set EFCORE5=5.2.0.21 +set EFCORE2=2.2.0.22-efcore2.1.0 +set EFCORE3=3.2.0.22 +set EFCORE5=5.2.0.22 ::删除所有bin与obj下的文件 @echo off diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/EFCore2x/EFCore2TryCurrentAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/EFCore2x/EFCore2TryCurrentAsyncEnumerator.cs new file mode 100644 index 00000000..d953ed5a --- /dev/null +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/EFCore2x/EFCore2TryCurrentAsyncEnumerator.cs @@ -0,0 +1,42 @@ + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/9/29 14:11:30 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ +#if EFCORE2 + public class EFCore2TryCurrentAsyncEnumerator:IAsyncEnumerator + { + private readonly IAsyncEnumerator _asyncEnumerator; + private bool currentMoved=false; + + public EFCore2TryCurrentAsyncEnumerator(IAsyncEnumerator asyncEnumerator) + { + _asyncEnumerator = asyncEnumerator; + } + public void Dispose() + { + _asyncEnumerator?.Dispose(); + } + + public async Task MoveNext(CancellationToken cancellationToken) + { + var moveNext = await _asyncEnumerator.MoveNext(cancellationToken); + currentMoved = moveNext; + return moveNext; + } + + public T Current => currentMoved ? _asyncEnumerator.Current : default; + } +#endif +} diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs index 4b2c71b4..0149a721 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs @@ -6,12 +6,6 @@ using System.Threading.Tasks; namespace ShardingCore.Sharding.Enumerators { - /* - * @Author: xjm - * @Description: - * @Date: Saturday, 14 August 2021 21:25:50 - * @Email: 326308290@qq.com - */ public class StreamMergeAsyncEnumerator : IStreamMergeAsyncEnumerator { private readonly IAsyncEnumerator _asyncSource; @@ -22,6 +16,7 @@ namespace ShardingCore.Sharding.Enumerators { if (_syncSource != null) throw new ArgumentNullException(nameof(_syncSource)); + _asyncSource = asyncSource; skip = true; } @@ -60,18 +55,38 @@ namespace ShardingCore.Sharding.Enumerators return await _asyncSource.MoveNextAsync(); } - public T Current => GetCurrent(); - public T ReallyCurrent => GetReallyCurrent(); + public void Dispose() + { + _syncSource?.Dispose(); + } + +#endif + public bool MoveNext() + { + if (skip) + { + skip = false; + return null != _syncSource.Current; + } + return _syncSource.MoveNext(); + } + public bool HasElement() { if (_asyncSource != null) return null != _asyncSource.Current; if (_syncSource != null) return null != _syncSource.Current; return false; } - public void Dispose() + + + public void Reset() { - _syncSource?.Dispose(); + throw new NotImplementedException(); } + + object IEnumerator.Current => Current; + public T Current => GetCurrent(); + public T ReallyCurrent => GetReallyCurrent(); public T GetCurrent() { if (skip) @@ -86,26 +101,6 @@ namespace ShardingCore.Sharding.Enumerators if (_syncSource != null) return _syncSource.Current; return default; } - public bool MoveNext() - { - if (skip) - { - skip = false; - return null != _syncSource.Current; - } - return _syncSource.MoveNext(); - } - -#endif - - - - public void Reset() - { - throw new NotImplementedException(); - } - - object IEnumerator.Current => Current; #if EFCORE2 public void Dispose() { @@ -118,61 +113,11 @@ namespace ShardingCore.Sharding.Enumerators if (skip) { skip = false; - return null != SourceCurrent(); + return null != _asyncSource.Current; } return await _asyncSource.MoveNext(cancellationToken); } - public T Current => GetCurrent(); - public T ReallyCurrent => GetReallyCurrent(); - public bool HasElement() - { - return null != SourceCurrent(); - } - private T SourceCurrent() - { - try - { - if (tryGetCurrentError) - return default; - if (_asyncSource!= null) - return _asyncSource.Current; - if (_syncSource != null) - return _syncSource.Current; - return default; - } - catch (Exception e) - { - tryGetCurrentError = true; - return default; - } - } - - private bool tryGetCurrentError = false; - - public T GetCurrent() - { - if (skip) - return default; - if (_asyncSource != null) return SourceCurrent(); - if (_syncSource != null) return _syncSource.Current; - return default; - } - public T GetReallyCurrent() - { - if (_asyncSource != null) return SourceCurrent(); - if (_syncSource != null) return _syncSource.Current; - return default; - } - public bool MoveNext() - { - if (skip) - { - skip = false; - return null != _syncSource.Current; - } - return _syncSource.MoveNext(); - } #endif } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs index ab0879b4..b6c7680c 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs @@ -10,6 +10,7 @@ using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x; #if EFCORE2 using Microsoft.EntityFrameworkCore.Extensions.Internal; #endif @@ -66,7 +67,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. return enumator; #endif #if EFCORE2 - var enumator = newQueryable.AsAsyncEnumerable().GetEnumerator(); + var enumator = + new EFCore2TryCurrentAsyncEnumerator(newQueryable.AsAsyncEnumerable().GetEnumerator()); await enumator.MoveNext(); return enumator; #endif