support efcore 2x 3x 5x

This commit is contained in:
xuejiaming 2021-09-03 21:45:51 +08:00
parent 7f064523d8
commit 032eccc575
14 changed files with 104 additions and 156 deletions

View File

@ -30,7 +30,7 @@ namespace ShardingCore.Core.VirtualTables
/// <summary>
/// 是否启用分页配置
/// </summary>
bool EnablePagination => PaginationMetadata != null;
bool EnablePagination { get; }
/// <summary>
/// 获取所有的物理表

View File

@ -31,6 +31,7 @@ namespace ShardingCore.Core.VirtualTables
public ShardingTableConfig ShardingConfig { get; }
public PaginationMetadata PaginationMetadata { get; }
public bool EnablePagination => PaginationMetadata != null;
private readonly List<IPhysicTable> _physicTables = new List<IPhysicTable>();

View File

@ -68,104 +68,12 @@ namespace ShardingCore.EFCores
#endif
#if EFCORE2
private IAsyncEnumerable<TResult> AsyncEnumerableExecute<TResult>(IShardingDbContext shardingDbContext, Expression query)
{
Type queryEntityType = query.Type.GetSequenceType();
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, query);
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext });
Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
return (IAsyncEnumerable<TResult>)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
}
private Task<TResult> EnumerableExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query, bool async)
{
Type queryEntityType;
if (async)
queryEntityType = typeof(TResult).GetGenericArguments()[0];
else
{
queryEntityType = query.Type.GetSequenceType();
}
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, query);
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext });
Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
return (Task<TResult>)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
}
private Task<TResult> GenericMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
var queryEntityType = query.GetQueryEntityType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
var @params = async ? new object[] { cancellationToken } : new object[0];
return (Task<TResult>)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, @params);
}
private Task<TResult> EnsureMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType());
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
var @params = async ? new object[] { cancellationToken } : new object[0];
return (Task<TResult>)streamEngineMethod.Invoke(streamEngine, @params);
}
private Task<TResult> GenericMergeExecuteAsync2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
var queryEntityType = query.GetQueryEntityType();
var resultType = query.GetResultType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType,resultType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
var @params = async ? new object[] { cancellationToken } : new object[0];
return (Task<TResult>)streamEngineMethod.MakeGenericMethod(new Type[] { resultType }).Invoke(streamEngine, @params);
}
private Task<TResult> EnsureMergeExecuteAsync2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
{
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult));
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureMergeResult<object>.MergeResultAsync) );
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
var @params = new object[] { cancellationToken };
return (Task<TResult>)streamEngineMethod.Invoke(streamEngine, @params);
}
#endif
#if EFCORE2
public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(Expression query)
{
return _shardingQueryExecutor.ExecuteAsync<IAsyncEnumerable<TResult>>(_currentContext, query, cancellationToken);
return _shardingQueryExecutor.ExecuteAsync<IAsyncEnumerable<TResult>>(_currentContext, query);
}
public Task<TResult> ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)

View File

@ -2,6 +2,9 @@ using System.Linq.Expressions;
using System.Threading;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Sharding.Enumerators;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Internal;
#endif
namespace ShardingCore.Sharding.Abstractions
{

View File

@ -3,6 +3,7 @@ using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
@ -23,6 +24,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
{
_inMemoryStreamMergeAsyncEnumerator = inMemoryStreamMergeAsyncEnumerator;
}
#if !EFCORE2
public async ValueTask DisposeAsync()
{
await _inMemoryStreamMergeAsyncEnumerator.DisposeAsync();
@ -45,6 +47,25 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
return _reverseEnumerator.MoveNext();
}
#endif
#if EFCORE2
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
if (_first)
{
LinkedList<T> _reverseCollection = new LinkedList<T>();
while (await _inMemoryStreamMergeAsyncEnumerator.MoveNext(cancellationToken))
{
_reverseCollection.AddFirst(_inMemoryStreamMergeAsyncEnumerator.GetCurrent());
}
_reverseEnumerator = _reverseCollection.GetEnumerator();
_first = false;
}
return _reverseEnumerator.MoveNext();
}
#endif
public bool MoveNext()
{
@ -68,6 +89,7 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => GetCurrent();

View File

@ -233,15 +233,6 @@ namespace ShardingCore.Sharding.Enumerators
}
}
#endif
#if EFCORE2
public void Dispose()
{
foreach (var enumerator in _enumerators)
{
enumerator.Dispose();
}
}
#endif
public void Reset()

View File

@ -138,16 +138,6 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
}
}
#endif
#if EFCORE2
public void Dispose()
{
foreach (var enumerator in _enumerators)
{
enumerator.Dispose();
}
}
#endif
@ -161,7 +151,10 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
public T Current => skipFirst ? default : _currentEnumerator.GetCurrent();
public void Dispose()
{
_currentEnumerator.Dispose();
foreach (var enumerator in _enumerators)
{
enumerator.Dispose();
}
}
}
}

View File

@ -135,12 +135,6 @@ namespace ShardingCore.Sharding.Enumerators
return _enumerator.DisposeAsync();
}
#endif
#if EFCORE2
public void Dispose()
{
_enumerator.Dispose();
}
#endif
}
}

View File

@ -132,13 +132,6 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
{
return _enumerator.DisposeAsync();
}
#endif
#if EFCORE2
public void Dispose()
{
_enumerator.Dispose();
}
#endif
}
}

View File

@ -60,23 +60,6 @@ namespace ShardingCore.Sharding.Enumerators
return await _asyncSource.MoveNextAsync();
}
public bool MoveNext()
{
if (skip)
{
skip = false;
return null != _syncSource.Current;
}
return _syncSource.MoveNext();
}
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
public T Current => GetCurrent();
public T ReallyCurrent => GetReallyCurrent();
public bool HasElement()
@ -89,7 +72,6 @@ namespace ShardingCore.Sharding.Enumerators
{
_syncSource?.Dispose();
}
public T GetCurrent()
{
if (skip)
@ -104,8 +86,26 @@ namespace ShardingCore.Sharding.Enumerators
if (_syncSource != null) return _syncSource.Current;
return default;
}
public bool MoveNext()
{
if (skip)
{
skip = false;
return null != _syncSource.Current;
}
return _syncSource.MoveNext();
}
#endif
public void Reset()
{
throw new NotImplementedException();
}
object IEnumerator.Current => Current;
#if EFCORE2
public void Dispose()
{
@ -121,8 +121,8 @@ namespace ShardingCore.Sharding.Enumerators
}
return await _asyncSource.MoveNext(cancellationToken);
}
public T Current => skip ? default : SourceCurrent();
public T ReallyCurrent => SourceCurrent();
public T Current => GetCurrent();
public T ReallyCurrent => GetReallyCurrent();
public bool HasElement()
{
return null != SourceCurrent();
@ -144,6 +144,30 @@ namespace ShardingCore.Sharding.Enumerators
private bool tryGetCurrentError = false;
public T GetCurrent()
{
if (skip)
return default;
if (_asyncSource != null) return SourceCurrent();
if (_syncSource != null) return _syncSource.Current;
return default;
}
public T GetReallyCurrent()
{
if (_asyncSource != null) return SourceCurrent();
if (_syncSource != null) return _syncSource.Current;
return default;
}
public bool MoveNext()
{
if (skip)
{
skip = false;
return null != _syncSource.Current;
}
return _syncSource.MoveNext();
}
#endif
}
}

View File

@ -12,6 +12,9 @@ using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Internal;
#endif
namespace ShardingCore.Sharding.ShardingQueryExecutors
{

View File

@ -34,15 +34,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines
#endif
#if EFCORE2
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable)
{
var enumator = newQueryable.AsAsyncEnumerable().GetEnumerator();
await enumator.MoveNext();
return enumator;
}
IAsyncEnumerator<T> IAsyncEnumerable<T>.GetEnumerator()
{
return GetShardingEnumerator();
return ((IAsyncEnumerable<T>)new EnumeratorShardingQueryExecutor<T>(_mergeContext).ExecuteAsync())
.GetEnumerator();
}
#endif
@ -50,7 +45,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public IEnumerator<T> GetEnumerator()
{
return new EnumeratorShardingQueryExecutor<T>(_mergeContext).ExecuteAsync()
return ((IEnumerable<T>)new EnumeratorShardingQueryExecutor<T>(_mergeContext).ExecuteAsync())
.GetEnumerator();
}

View File

@ -10,6 +10,9 @@ using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions
{
@ -57,9 +60,16 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
}
public async Task<IAsyncEnumerator<TEntity>> DoGetAsyncEnumerator(IQueryable<TEntity> newQueryable)
{
#if !EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
#endif
#if EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetEnumerator();
await enumator.MoveNext();
return enumator;
#endif
}
public IEnumerator<TEntity> DoGetEnumerator(IQueryable<TEntity> newQueryable)
{

View File

@ -32,11 +32,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
public abstract IStreamMergeAsyncEnumerator<TEntity> GetShardingAsyncEnumerator(bool async,
CancellationToken cancellationToken = new CancellationToken());
public IAsyncEnumerator<TEntity> GetAsyncEnumerator(
#if !EFCORE2
public IAsyncEnumerator<TEntity> GetAsyncEnumerator(
CancellationToken cancellationToken = new CancellationToken())
{
return GetShardingAsyncEnumerator(true,cancellationToken);
}
#endif
#if EFCORE2
IAsyncEnumerator<TEntity> IAsyncEnumerable<TEntity>.GetEnumerator()
{
return GetShardingAsyncEnumerator(true);
}
#endif
public IEnumerator<TEntity> GetEnumerator()
{
@ -58,5 +68,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
});
}
}
}
}