diff --git a/src/ShardingCore/Core/VirtualTables/IVirtualTable.cs b/src/ShardingCore/Core/VirtualTables/IVirtualTable.cs index 2cb27e91..cfd9cdf1 100644 --- a/src/ShardingCore/Core/VirtualTables/IVirtualTable.cs +++ b/src/ShardingCore/Core/VirtualTables/IVirtualTable.cs @@ -30,7 +30,7 @@ namespace ShardingCore.Core.VirtualTables /// /// 是否启用分页配置 /// - bool EnablePagination => PaginationMetadata != null; + bool EnablePagination { get; } /// /// 获取所有的物理表 diff --git a/src/ShardingCore/Core/VirtualTables/OneDbVirtualTable.cs b/src/ShardingCore/Core/VirtualTables/OneDbVirtualTable.cs index eff6e5fd..0cf79390 100644 --- a/src/ShardingCore/Core/VirtualTables/OneDbVirtualTable.cs +++ b/src/ShardingCore/Core/VirtualTables/OneDbVirtualTable.cs @@ -31,6 +31,7 @@ namespace ShardingCore.Core.VirtualTables public ShardingTableConfig ShardingConfig { get; } public PaginationMetadata PaginationMetadata { get; } + public bool EnablePagination => PaginationMetadata != null; private readonly List _physicTables = new List(); diff --git a/src/ShardingCore/EFCores/ShardingQueryCompiler.cs b/src/ShardingCore/EFCores/ShardingQueryCompiler.cs index 5decac82..47cf2256 100644 --- a/src/ShardingCore/EFCores/ShardingQueryCompiler.cs +++ b/src/ShardingCore/EFCores/ShardingQueryCompiler.cs @@ -68,104 +68,12 @@ namespace ShardingCore.EFCores #endif -#if EFCORE2 - - private IAsyncEnumerable AsyncEnumerableExecute(IShardingDbContext shardingDbContext, Expression query) - { - Type queryEntityType = query.Type.GetSequenceType(); - Type type = typeof(EnumerableQuery<>); - type = type.MakeGenericType(queryEntityType); - var queryable = Activator.CreateInstance(type, query); - - var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create"); - if (streamMergeContextMethod == null) - throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]"); - var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext }); - - - Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>); - streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); - return (IAsyncEnumerable)Activator.CreateInstance(streamMergeEngineType, streamMergeContext); - } - private Task EnumerableExecuteAsync(IShardingDbContext shardingDbContext, Expression query, bool async) - { - Type queryEntityType; - if (async) - queryEntityType = typeof(TResult).GetGenericArguments()[0]; - else - { - queryEntityType = query.Type.GetSequenceType(); - } - Type type = typeof(EnumerableQuery<>); - type = type.MakeGenericType(queryEntityType); - var queryable = Activator.CreateInstance(type, query); - - var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create"); - if (streamMergeContextMethod == null) - throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]"); - var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext }); - - - Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>); - streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); - return (Task)Activator.CreateInstance(streamMergeEngineType, streamMergeContext); - } - private Task GenericMergeExecuteAsync(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) - { - var queryEntityType = query.GetQueryEntityType(); - streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); - var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); - var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult)); - if (streamEngineMethod == null) - throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); - var @params = async ? new object[] { cancellationToken } : new object[0]; - return (Task)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, @params); - } - - - private Task EnsureMergeExecuteAsync(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) - { - streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType()); - var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); - var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult.MergeResultAsync) : nameof(IEnsureMergeResult.MergeResult)); - if (streamEngineMethod == null) - throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); - var @params = async ? new object[] { cancellationToken } : new object[0]; - return (Task)streamEngineMethod.Invoke(streamEngine, @params); - } - private Task GenericMergeExecuteAsync2(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) - { - var queryEntityType = query.GetQueryEntityType(); - var resultType = query.GetResultType(); - streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType,resultType); - var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); - var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult)); - if (streamEngineMethod == null) - throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); - var @params = async ? new object[] { cancellationToken } : new object[0]; - return (Task)streamEngineMethod.MakeGenericMethod(new Type[] { resultType }).Invoke(streamEngine, @params); - } - - - private Task EnsureMergeExecuteAsync2(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken) - { - streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult)); - var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); - var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureMergeResult.MergeResultAsync) ); - if (streamEngineMethod == null) - throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); - var @params = new object[] { cancellationToken }; - return (Task)streamEngineMethod.Invoke(streamEngine, @params); - } - -#endif - #if EFCORE2 public IAsyncEnumerable ExecuteAsync(Expression query) { - return _shardingQueryExecutor.ExecuteAsync>(_currentContext, query, cancellationToken); + return _shardingQueryExecutor.ExecuteAsync>(_currentContext, query); } public Task ExecuteAsync(Expression query, CancellationToken cancellationToken) diff --git a/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs b/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs index 47d0e080..b9a4ce5c 100644 --- a/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs +++ b/src/ShardingCore/Sharding/Abstractions/IShardingQueryExecutor.cs @@ -2,6 +2,9 @@ using System.Linq.Expressions; using System.Threading; using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Sharding.Enumerators; +#if EFCORE2 +using Microsoft.EntityFrameworkCore.Internal; +#endif namespace ShardingCore.Sharding.Abstractions { diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryReverseStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryReverseStreamMergeAsyncEnumerator.cs index 6ef6bbb0..196f3f5f 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryReverseStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryReverseStreamMergeAsyncEnumerator.cs @@ -3,6 +3,7 @@ using System.Collections; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync @@ -23,6 +24,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync { _inMemoryStreamMergeAsyncEnumerator = inMemoryStreamMergeAsyncEnumerator; } +#if !EFCORE2 public async ValueTask DisposeAsync() { await _inMemoryStreamMergeAsyncEnumerator.DisposeAsync(); @@ -45,6 +47,25 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync return _reverseEnumerator.MoveNext(); } +#endif +#if EFCORE2 + public async Task MoveNext(CancellationToken cancellationToken) + { + if (_first) + { + LinkedList _reverseCollection = new LinkedList(); + while (await _inMemoryStreamMergeAsyncEnumerator.MoveNext(cancellationToken)) + { + _reverseCollection.AddFirst(_inMemoryStreamMergeAsyncEnumerator.GetCurrent()); + } + + _reverseEnumerator = _reverseCollection.GetEnumerator(); + _first = false; + } + + return _reverseEnumerator.MoveNext(); + } +#endif public bool MoveNext() { @@ -68,6 +89,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync throw new NotImplementedException(); } + object IEnumerator.Current => Current; public T Current => GetCurrent(); diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiAggregateOrderStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiAggregateOrderStreamMergeAsyncEnumerator.cs index 1959298b..dba826ee 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiAggregateOrderStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiAggregateOrderStreamMergeAsyncEnumerator.cs @@ -233,15 +233,6 @@ namespace ShardingCore.Sharding.Enumerators } } #endif -#if EFCORE2 - public void Dispose() - { - foreach (var enumerator in _enumerators) - { - enumerator.Dispose(); - } - } -#endif public void Reset() diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiOrderStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiOrderStreamMergeAsyncEnumerator.cs index 343c6560..c7bdeb9b 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiOrderStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/MultiOrderStreamMergeAsyncEnumerator.cs @@ -138,16 +138,6 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync } } #endif -#if EFCORE2 - - public void Dispose() - { - foreach (var enumerator in _enumerators) - { - enumerator.Dispose(); - } - } -#endif @@ -161,7 +151,10 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync public T Current => skipFirst ? default : _currentEnumerator.GetCurrent(); public void Dispose() { - _currentEnumerator.Dispose(); + foreach (var enumerator in _enumerators) + { + enumerator.Dispose(); + } } } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs index e2715423..3b8633a4 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/OrderStreamMergeAsyncEnumerator.cs @@ -135,12 +135,6 @@ namespace ShardingCore.Sharding.Enumerators return _enumerator.DisposeAsync(); } #endif -#if EFCORE2 - public void Dispose() - { - _enumerator.Dispose(); - } -#endif } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs index f12be9eb..80b6401a 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/PaginationStreamMergeAsyncEnumerator.cs @@ -132,13 +132,6 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync { return _enumerator.DisposeAsync(); } -#endif -#if EFCORE2 - - public void Dispose() - { - _enumerator.Dispose(); - } #endif } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs index fd3864a4..163b520d 100644 --- a/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/StreamMergeAsyncEnumerator.cs @@ -60,23 +60,6 @@ namespace ShardingCore.Sharding.Enumerators return await _asyncSource.MoveNextAsync(); } - public bool MoveNext() - { - if (skip) - { - skip = false; - return null != _syncSource.Current; - } - return _syncSource.MoveNext(); - } - - public void Reset() - { - throw new NotImplementedException(); - } - - object IEnumerator.Current => Current; - public T Current => GetCurrent(); public T ReallyCurrent => GetReallyCurrent(); public bool HasElement() @@ -89,7 +72,6 @@ namespace ShardingCore.Sharding.Enumerators { _syncSource?.Dispose(); } - public T GetCurrent() { if (skip) @@ -104,8 +86,26 @@ namespace ShardingCore.Sharding.Enumerators if (_syncSource != null) return _syncSource.Current; return default; } + public bool MoveNext() + { + if (skip) + { + skip = false; + return null != _syncSource.Current; + } + return _syncSource.MoveNext(); + } #endif + + + + public void Reset() + { + throw new NotImplementedException(); + } + + object IEnumerator.Current => Current; #if EFCORE2 public void Dispose() { @@ -121,8 +121,8 @@ namespace ShardingCore.Sharding.Enumerators } return await _asyncSource.MoveNext(cancellationToken); } - public T Current => skip ? default : SourceCurrent(); - public T ReallyCurrent => SourceCurrent(); + public T Current => GetCurrent(); + public T ReallyCurrent => GetReallyCurrent(); public bool HasElement() { return null != SourceCurrent(); @@ -144,6 +144,30 @@ namespace ShardingCore.Sharding.Enumerators private bool tryGetCurrentError = false; + public T GetCurrent() + { + if (skip) + return default; + if (_asyncSource != null) return SourceCurrent(); + if (_syncSource != null) return _syncSource.Current; + return default; + } + public T GetReallyCurrent() + { + if (_asyncSource != null) return SourceCurrent(); + if (_syncSource != null) return _syncSource.Current; + return default; + } + public bool MoveNext() + { + if (skip) + { + skip = false; + return null != _syncSource.Current; + } + return _syncSource.MoveNext(); + } + #endif } } \ No newline at end of file diff --git a/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs b/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs index 7332a081..117079b9 100644 --- a/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs +++ b/src/ShardingCore/Sharding/ShardingQueryExecutors/DefaultShardingQueryExecutor.cs @@ -12,6 +12,9 @@ using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines; +#if EFCORE2 +using Microsoft.EntityFrameworkCore.Internal; +#endif namespace ShardingCore.Sharding.ShardingQueryExecutors { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs index 7aa5a660..d8f7b300 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs @@ -34,15 +34,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines #endif #if EFCORE2 - private async Task> GetAsyncEnumerator(IQueryable newQueryable) - { - var enumator = newQueryable.AsAsyncEnumerable().GetEnumerator(); - await enumator.MoveNext(); - return enumator; - } IAsyncEnumerator IAsyncEnumerable.GetEnumerator() { - return GetShardingEnumerator(); + return ((IAsyncEnumerable)new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync()) + .GetEnumerator(); } #endif @@ -50,7 +45,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines public IEnumerator GetEnumerator() { - return new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync() + return ((IEnumerable)new EnumeratorShardingQueryExecutor(_mergeContext).ExecuteAsync()) .GetEnumerator(); } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs index 370a86f9..ab0879b4 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorAsyncStreamMergeEngine.cs @@ -10,6 +10,9 @@ using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +#if EFCORE2 +using Microsoft.EntityFrameworkCore.Extensions.Internal; +#endif namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions { @@ -57,9 +60,16 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. } public async Task> DoGetAsyncEnumerator(IQueryable newQueryable) { +#if !EFCORE2 var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator(); await enumator.MoveNextAsync(); return enumator; +#endif +#if EFCORE2 + var enumator = newQueryable.AsAsyncEnumerable().GetEnumerator(); + await enumator.MoveNext(); + return enumator; +#endif } public IEnumerator DoGetEnumerator(IQueryable newQueryable) { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs index 1b5c0f84..41d907f3 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/EnumeratorStreamMergeEngines/Abstractions/AbstractEnumeratorStreamMergeEngine.cs @@ -32,11 +32,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. public abstract IStreamMergeAsyncEnumerator GetShardingAsyncEnumerator(bool async, CancellationToken cancellationToken = new CancellationToken()); - public IAsyncEnumerator GetAsyncEnumerator( +#if !EFCORE2 + public IAsyncEnumerator GetAsyncEnumerator( CancellationToken cancellationToken = new CancellationToken()) { return GetShardingAsyncEnumerator(true,cancellationToken); } +#endif + +#if EFCORE2 + IAsyncEnumerator IAsyncEnumerable.GetEnumerator() + { + return GetShardingAsyncEnumerator(true); + } + +#endif public IEnumerator GetEnumerator() { @@ -58,5 +68,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines. }); } } + } }