This commit is contained in:
xuejiaming 2021-08-20 22:07:44 +08:00
parent 9309e587c8
commit 5a4dee9360
17 changed files with 444 additions and 100 deletions

View File

@ -6,11 +6,11 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="3.1.18" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="2.2.6" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src3x\ShardingCore.3x\ShardingCore.3x.csproj" />
<ProjectReference Include="..\..\src2x\ShardingCore.2x\ShardingCore.2x.csproj" />
</ItemGroup>
</Project>

View File

@ -24,7 +24,7 @@ namespace Sample.SqlServer
//services.AddDbContext<DefaultTableDbContext>(o => o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDBxx3;Integrated Security=True"));
services.AddShardingDbContext<DefaultShardingDbContext, DefaultTableDbContext>(o => o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDBxx3;Integrated Security=True;MultipleActiveResultSets=True;")
services.AddShardingDbContext<DefaultShardingDbContext, DefaultTableDbContext>(o => o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDBxx2;Integrated Security=True;MultipleActiveResultSets=True;")
,op =>
{
op.EnsureCreatedWithOutShardingTable = true;

View File

@ -14,6 +14,10 @@ using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Internal;
#endif
namespace ShardingCore.EFCores
{
/**
@ -38,12 +42,11 @@ namespace ShardingCore.EFCores
{
return _currentContext;
}
private TResult EnumerableExecute<TResult>(IShardingDbContext shardingDbContext, Expression query,bool async)
private TResult EnumerableExecute<TResult>(IShardingDbContext shardingDbContext, Expression query, bool async)
{
Type queryEntityType ;
Type queryEntityType;
if (async)
queryEntityType= typeof(TResult).GetGenericArguments()[0];
queryEntityType = typeof(TResult).GetGenericArguments()[0];
else
{
queryEntityType = query.Type.GetSequenceType();
@ -114,8 +117,12 @@ namespace ShardingCore.EFCores
}
#if !EFCORE2
throw new ShardingCoreException($"db context operator not support query expression:[{query.Print()}] result type:[{typeof(TResult).FullName}]");
#endif
#if EFCORE2
throw new ShardingCoreException($"db context operator not support query expression:[{query}] result type:[{typeof(TResult).FullName}]");
#endif
//IQueryable<TResult> queryable = new EnumerableQuery<TResult>(expression);
//var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext);
@ -126,6 +133,8 @@ namespace ShardingCore.EFCores
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
#if !EFCORE2
public TResult ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
var currentDbContext = GetCurrentDbContext().Context;
@ -194,19 +203,26 @@ namespace ShardingCore.EFCores
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
public Func<QueryContext, TResult> CreateCompiledQuery<TResult>(Expression query)
{
throw new NotImplementedException();
}
public Func<QueryContext, Task<TResult>> CreateCompiledAsyncTaskQuery<TResult>(Expression query)
{
throw new NotImplementedException();
}
public Func<QueryContext, TResult> CreateCompiledAsyncQuery<TResult>(Expression query)
{
throw new NotImplementedException();
}
#endif
private TResult GenericMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
//Type type = typeof(EnumerableQuery<>);
//type = type.MakeGenericType(queryEntityType);
//var queryable = Activator.CreateInstance(type, query);
//var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
//if (streamMergeContextMethod == null)
// throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
//var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
var queryEntityType = query.GetQueryEntityType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
@ -220,16 +236,6 @@ namespace ShardingCore.EFCores
private TResult EnsureMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
//Type type = typeof(EnumerableQuery<>);
//type = type.MakeGenericType(queryEntityType);
//var queryable = Activator.CreateInstance(type, query);
//var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
//if (streamMergeContextMethod == null)
// throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
//var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType());
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult));
@ -241,15 +247,6 @@ namespace ShardingCore.EFCores
private TResult EnsureMergeExecute2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
//Type type = typeof(EnumerableQuery<>);
//type = type.MakeGenericType(queryEntityType);
//var queryable = Activator.CreateInstance(type, query);
//var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
//if (streamMergeContextMethod == null)
// throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
//var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
if (async)
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0]);
else
@ -261,34 +258,178 @@ namespace ShardingCore.EFCores
var @params = async ? new object[] { cancellationToken } : new object[0];
return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
}
//private TResult EnsureMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
//{
#if EFCORE2
// //Type type = typeof(EnumerableQuery<>);
// //type = type.MakeGenericType(queryEntityType);
// //var queryable = Activator.CreateInstance(type, query);
private IAsyncEnumerable<TResult> AsyncEnumerableExecute<TResult>(IShardingDbContext shardingDbContext, Expression query)
{
Type queryEntityType = query.Type.GetSequenceType();
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, query);
// //var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
// //if (streamMergeContextMethod == null)
// // throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
// //var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext });
// streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType());
// var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
// var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureAsyncMergeResult<object>.MergeResultAsync));
// if (streamEngineMethod == null)
// throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
// return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken });
//}
Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
return (IAsyncEnumerable<TResult>)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
}
private Task<TResult> EnumerableExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query, bool async)
{
Type queryEntityType;
if (async)
queryEntityType = typeof(TResult).GetGenericArguments()[0];
else
{
queryEntityType = query.Type.GetSequenceType();
}
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, query);
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext });
Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
return (Task<TResult>)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
}
private Task<TResult> GenericMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
var queryEntityType = query.GetQueryEntityType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
var @params = async ? new object[] { cancellationToken } : new object[0];
return (Task<TResult>)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, @params);
}
private Task<TResult> EnsureMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType());
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
var @params = async ? new object[] { cancellationToken } : new object[0];
return (Task<TResult>)streamEngineMethod.Invoke(streamEngine, @params);
}
private Task<TResult> EnsureMergeExecuteAsync2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
{
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult));
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureMergeResult<object>.MergeResultAsync) );
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
var @params = new object[] { cancellationToken };
return (Task<TResult>)streamEngineMethod.Invoke(streamEngine, @params);
}
#endif
#if EFCORE2
public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(Expression query)
{
var currentDbContext = GetCurrentDbContext().Context;
if (currentDbContext is IShardingDbContext shardingDbContext)
{
return AsyncEnumerableExecute<TResult>(shardingDbContext, query);
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
public Task<TResult> ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
var currentDbContext = GetCurrentDbContext().Context;
var async = true;
if (currentDbContext is IShardingDbContext shardingDbContext)
{
if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>)))
{
return EnumerableExecuteAsync<TResult>(shardingDbContext, query, async);
}
if (query is MethodCallExpression methodCallExpression)
{
switch (methodCallExpression.Method.Name)
{
case nameof(Enumerable.First):
return GenericMergeExecuteAsync<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.FirstOrDefault):
return GenericMergeExecuteAsync<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Last):
return GenericMergeExecuteAsync<TResult>(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LastOrDefault):
return GenericMergeExecuteAsync<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Single):
return GenericMergeExecuteAsync<TResult>(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.SingleOrDefault):
return GenericMergeExecuteAsync<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Count):
return EnsureMergeExecuteAsync<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LongCount):
return EnsureMergeExecuteAsync<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Any):
return EnsureMergeExecuteAsync<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.All):
return EnsureMergeExecuteAsync<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Max):
return GenericMergeExecuteAsync<TResult>(typeof(MaxAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Min):
return EnsureMergeExecuteAsync<TResult>(typeof(MinAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Sum):
return EnsureMergeExecuteAsync2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, cancellationToken);
case nameof(Enumerable.Average):
return EnsureMergeExecuteAsync2<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression,cancellationToken);
case nameof(Enumerable.Contains):
return EnsureMergeExecuteAsync<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
}
}
throw new ShardingCoreException($"db context operator not support query expression:[{query}] result type:[{typeof(TResult).FullName}]");
//IQueryable<TResult> queryable = new EnumerableQuery<TResult>(expression);
//var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext);
//var streamMergeEngine = AsyncEnumerableStreamMergeEngine<TResult>.Create<TResult>(streamMergeContext);
//return streamMergeEngine.GetAsyncEnumerator();
}
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
public Func<QueryContext, TResult> CreateCompiledQuery<TResult>(Expression query)
{
throw new NotImplementedException();
}
public Func<QueryContext, TResult> CreateCompiledAsyncQuery<TResult>(Expression query)
public Func<QueryContext, IAsyncEnumerable<TResult>> CreateCompiledAsyncEnumerableQuery<TResult>(Expression query)
{
throw new NotImplementedException();
}
public Func<QueryContext, Task<TResult>> CreateCompiledAsyncTaskQuery<TResult>(Expression query)
{
throw new NotImplementedException();
}
#endif
}
}

View File

@ -603,7 +603,7 @@ namespace ShardingCore.Sharding
i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
}
if (!isBeginTransaction)
Database.CurrentTransaction.Commit(cancellationToken);
Database.CurrentTransaction.Commit();
}
finally
{

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.PriorityQueues;
using ShardingCore.Extensions;
@ -54,11 +55,21 @@ namespace ShardingCore.Sharding.Enumerators
return _mergeContext.SelectContext.SelectProperties.Where(o => !o.IsAggregateMethod)
.Select(o => first.GetValueByExpression(o.PropertyName)).ToList();
}
#if !EFCORE2
public async ValueTask<bool> MoveNextAsync()
#endif
#if EFCORE2
public async Task<bool> MoveNext(CancellationToken cancellationToken=new CancellationToken())
#endif
{
if (_queue.IsEmpty())
return false;
var hasNext=await SetCurrentValue();
#if !EFCORE2
var hasNext = await SetCurrentValue();
#endif
#if EFCORE2
var hasNext = await SetCurrentValue(cancellationToken);
#endif
if (hasNext)
{
CurrentGroupValues = _queue.IsEmpty() ? new List<object>(0) : GetCurrentGroupValues(_queue.Peek());
@ -77,7 +88,12 @@ namespace ShardingCore.Sharding.Enumerators
return true;
}
#if !EFCORE2
private async ValueTask<bool> SetCurrentValue()
#endif
#if EFCORE2
private async Task<bool> SetCurrentValue(CancellationToken cancellationToken=new CancellationToken())
#endif
{
CurrentValue = default;
var currentValues = new List<T>();
@ -87,7 +103,12 @@ namespace ShardingCore.Sharding.Enumerators
currentValues.Add(current);
var first = _queue.Poll();
#if !EFCORE2
if (await first.MoveNextAsync())
#endif
#if EFCORE2
if (await first.MoveNext(cancellationToken))
#endif
{
_queue.Offer(first);
}
@ -157,6 +178,7 @@ namespace ShardingCore.Sharding.Enumerators
public T ReallyCurrent => _queue.IsEmpty()?default(T):_queue.Peek().ReallyCurrent;
#if !EFCORE2
public async ValueTask DisposeAsync()
{
@ -165,6 +187,16 @@ namespace ShardingCore.Sharding.Enumerators
await enumerator.DisposeAsync();
}
}
#endif
#if EFCORE2
public void Dispose()
{
foreach (var enumerator in _enumerators)
{
enumerator.Dispose();
}
}
#endif
public T Current => CurrentValue;
}

View File

@ -1,20 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.PriorityQueues;
namespace ShardingCore.Sharding.Enumerators
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:49:09
* @Email: 326308290@qq.com
*/
public class MultiOrderStreamMergeAsyncEnumerator<T>:IStreamMergeAsyncEnumerator<T>
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:49:09
* @Email: 326308290@qq.com
*/
public class MultiOrderStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IEnumerable<IStreamMergeAsyncEnumerator<T>> _enumerators;
private readonly PriorityQueue<IOrderStreamMergeAsyncEnumerator<T>> _queue;
@ -44,7 +45,12 @@ namespace ShardingCore.Sharding.Enumerators
_currentEnumerator = _queue.IsEmpty() ? _enumerators.FirstOrDefault() : _queue.Peek();
}
#if !EFCORE2
public async ValueTask<bool> MoveNextAsync()
#endif
#if EFCORE2
public async Task<bool> MoveNext(CancellationToken cancellationToken = new CancellationToken())
#endif
{
if (_queue.IsEmpty())
return false;
@ -55,8 +61,14 @@ namespace ShardingCore.Sharding.Enumerators
}
var first = _queue.Poll();
#if !EFCORE2
if (await first.MoveNextAsync())
#endif
#if EFCORE2
if (await first.MoveNext(cancellationToken))
#endif
{
_queue.Offer(first);
}
@ -86,8 +98,9 @@ namespace ShardingCore.Sharding.Enumerators
return _currentEnumerator != null && _currentEnumerator.HasElement();
}
public T ReallyCurrent => _queue.IsEmpty()?default(T):_queue.Peek().ReallyCurrent;
public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent;
#if !EFCORE2
public async ValueTask DisposeAsync()
{
@ -96,6 +109,17 @@ namespace ShardingCore.Sharding.Enumerators
await enumerator.DisposeAsync();
}
}
#endif
#if EFCORE2
public void Dispose()
{
foreach (var enumerator in _enumerators)
{
enumerator.Dispose();
}
}
#endif
public T Current => skipFirst ? default : _currentEnumerator.Current;

View File

@ -1,21 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.Internal.StreamMerge;
using ShardingCore.Extensions;
namespace ShardingCore.Sharding.Enumerators
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:46:32
* @Email: 326308290@qq.com
*/
public class OrderStreamMergeAsyncEnumerator<T>:IOrderStreamMergeAsyncEnumerator<T>
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:46:32
* @Email: 326308290@qq.com
*/
public class OrderStreamMergeAsyncEnumerator<T> : IOrderStreamMergeAsyncEnumerator<T>
{
/// <summary>
/// 合并数据上下文
/// </summary>
@ -35,17 +36,26 @@ namespace ShardingCore.Sharding.Enumerators
{
_orderValues = HasElement() ? GetCurrentOrderValues() : new List<IComparable>(0);
}
#if !EFCORE2
public async ValueTask<bool> MoveNextAsync()
#endif
#if EFCORE2
public async Task<bool> MoveNext(CancellationToken cancellationToken = new CancellationToken())
#endif
{
#if !EFCORE2
var has = await _enumerator.MoveNextAsync();
#endif
#if EFCORE2
var has = await _enumerator.MoveNext(cancellationToken);
#endif
SetOrderValues();
return has;
}
public T Current =>_enumerator.Current;
public T Current => _enumerator.Current;
public bool SkipFirst()
{
@ -79,9 +89,11 @@ namespace ShardingCore.Sharding.Enumerators
public int CompareTo(IOrderStreamMergeAsyncEnumerator<T> other)
{
int i = 0;
foreach (var order in _mergeContext.Orders) {
foreach (var order in _mergeContext.Orders)
{
int result = CompareHelper.CompareToWith(_orderValues[i], other.GetCompares()[i], order.IsAsc);
if (0 != result) {
if (0 != result)
{
return result;
}
i++;
@ -93,11 +105,18 @@ namespace ShardingCore.Sharding.Enumerators
{
return _orderValues ?? new List<IComparable>(0);
}
#if !EFCORE2
public ValueTask DisposeAsync()
{
return _enumerator.DisposeAsync();
}
#endif
#if EFCORE2
public void Dispose()
{
_enumerator.Dispose();
}
#endif
}
}

View File

@ -1,24 +1,26 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
{
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:39:52
* @Email: 326308290@qq.com
*/
public class PaginationStreamMergeAsyncEnumerator<T>:IStreamMergeAsyncEnumerator<T>
/*
* @Author: xjm
* @Description:
* @Date: Sunday, 15 August 2021 06:39:52
* @Email: 326308290@qq.com
*/
public class PaginationStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly IStreamMergeAsyncEnumerator<T> _enumerator;
private readonly int? _skip;
private readonly int? _take;
private int realSkip=0;
private int realSkip = 0;
private int realTake = 0;
public PaginationStreamMergeAsyncEnumerator(StreamMergeContext<T> mergeContext,IEnumerable<IStreamMergeAsyncEnumerator<T>> sources)
public PaginationStreamMergeAsyncEnumerator(StreamMergeContext<T> mergeContext, IEnumerable<IStreamMergeAsyncEnumerator<T>> sources)
{
_mergeContext = mergeContext;
_skip = mergeContext.Skip;
@ -26,23 +28,37 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
if (_mergeContext.HasGroupQuery())
_enumerator = new MultiAggregateOrderStreamMergeAsyncEnumerator<T>(_mergeContext, sources);
else
_enumerator = new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext,sources);
_enumerator = new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, sources);
}
#if !EFCORE2
public async ValueTask<bool> MoveNextAsync()
#endif
#if EFCORE2
public async Task<bool> MoveNext(CancellationToken cancellationToken = new CancellationToken())
#endif
{
//如果合并数据的时候不需要跳过也没有take多少那么就是直接next
while (_skip.GetValueOrDefault() > this.realSkip)
{
#if !EFCORE2
var has = await _enumerator.MoveNextAsync();
#endif
#if EFCORE2
var has = await _enumerator.MoveNext(cancellationToken);
#endif
realSkip++;
if (!has)
return false;
}
#if !EFCORE2
var next = await _enumerator.MoveNextAsync();
#endif
#if EFCORE2
var next = await _enumerator.MoveNext(cancellationToken);
#endif
if (next)
{
if (_take.HasValue)
@ -68,10 +84,19 @@ namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
}
public T ReallyCurrent => _enumerator.ReallyCurrent;
#if !EFCORE2
public ValueTask DisposeAsync()
{
return _enumerator.DisposeAsync();
}
#endif
#if EFCORE2
public void Dispose()
{
_enumerator.Dispose();
}
#endif
}
}

View File

@ -1,6 +1,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.Enumerators
@ -21,6 +22,7 @@ namespace ShardingCore.Sharding.Enumerators
_source = source;
skip = true;
}
#if !EFCORE2
public ValueTask DisposeAsync()
{
return _source.DisposeAsync();
@ -35,6 +37,24 @@ namespace ShardingCore.Sharding.Enumerators
}
return await _source.MoveNextAsync();
}
#endif
#if EFCORE2
public void Dispose()
{
_source.Dispose();
}
public async Task<bool> MoveNext(CancellationToken cancellationToken=new CancellationToken())
{
if (skip)
{
skip = false;
return null!=_source.Current;
}
return await _source.MoveNext(cancellationToken);
}
#endif
public T Current => skip?default:_source.Current;
public bool SkipFirst()
@ -46,12 +66,27 @@ namespace ShardingCore.Sharding.Enumerators
}
return false;
}
#if !EFCORE2
public bool HasElement()
{
return null != _source.Current;
}
#endif
#if EFCORE2
public bool HasElement()
{
try
{
return null != _source.Current;
}
catch
{
return false;
}
}
#endif
public T ReallyCurrent => _source.Current;
}
}

View File

@ -25,7 +25,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
var secondExpression = GetSecondExpression();
if (!(secondExpression is ConstantExpression constantExpression))
{
throw new ShardingCoreException($"not found constant {methodCallExpression.Print()}");
#if !EFCORE2
throw new ShardingCoreException($"not found constant {methodCallExpression.Print()}");
#endif
#if EFCORE2
throw new ShardingCoreException($"not found constant {methodCallExpression}");
#endif
}
_constantItem = (TEntity)constantExpression.Value;
}
@ -33,7 +39,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
{
if (!(secondExpression is ConstantExpression))
{
#if !EFCORE2
throw new InvalidOperationException(_methodCallExpression.Print());
#endif
#if EFCORE2
throw new InvalidOperationException(_methodCallExpression.ToString());
#endif
}
return queryable;

View File

@ -31,8 +31,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
return queryable.Select(selector);
}
throw new ShardingCoreException($"expression is not selector:{secondExpression.Print()}");
#if !EFCORE2
throw new ShardingCoreException($"expression is not selector:{secondExpression.Print()}");
#endif
#if EFCORE2
throw new ShardingCoreException($"expression is not selector:{secondExpression}");
#endif
}
return queryable;
}

View File

@ -28,7 +28,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
{
return queryable.Where(predicate);
}
#if !EFCORE2
throw new InvalidOperationException(_methodCallExpression.Print());
#endif
#if EFCORE2
throw new InvalidOperationException(_methodCallExpression.ToString());
#endif
}
}

View File

@ -31,7 +31,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE
}
throw new ShardingCoreException($"expression is not selector:{secondExpression.Print()}");
#if !EFCORE2
throw new ShardingCoreException($"expression is not selector:{secondExpression.Print()}");
#endif
#if EFCORE2
throw new ShardingCoreException($"expression is not selector:{secondExpression}");
#endif
}
return queryable;
}

View File

@ -31,7 +31,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE
return queryable.Where(predicate);
}
}
#if !EFCORE2
throw new InvalidOperationException(_methodCallExpression.Print());
#endif
#if EFCORE2
throw new InvalidOperationException(_methodCallExpression.ToString());
#endif
}
}

View File

@ -31,7 +31,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
public AbstractInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext)
{
_methodCallExpression = methodCallExpression;
var expression = methodCallExpression.Arguments.FirstOrDefault(o => typeof(IQueryable).IsAssignableFrom(o.Type)) ?? throw new InvalidOperationException(methodCallExpression.Print());
var expression = methodCallExpression.Arguments.FirstOrDefault(o => typeof(IQueryable).IsAssignableFrom(o.Type))
#if !EFCORE2
?? throw new InvalidOperationException(methodCallExpression.Print());
#endif
#if EFCORE2
?? throw new InvalidOperationException(methodCallExpression.ToString());
#endif
_queryable = new EnumerableQuery<TEntity>(expression);
_secondExpression = methodCallExpression.Arguments.FirstOrDefault(o => !typeof(IQueryable).IsAssignableFrom(o.Type));
@ -42,7 +48,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
else
{
if (methodCallExpression.Arguments.Count == 2)
throw new InvalidOperationException(methodCallExpression.Print());
{
#if !EFCORE2
throw new InvalidOperationException(methodCallExpression.Print());
#endif
#if EFCORE2
throw new InvalidOperationException(methodCallExpression.ToString());
#endif
}
}
_mergeContext = ShardingContainer.GetService<IStreamMergeContextFactory>().Create(_queryable, shardingDbContext);

View File

@ -138,8 +138,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
return ConvertSum(average);
}
#if !EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}");
#endif
#if EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}");
#endif
}
public override async Task<TEnsureResult> MergeResultAsync(
@ -253,8 +259,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
return ConvertSum(average);
}
#if !EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}");
#endif
#if EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}");
#endif
}
private TEnsureResult ConvertSum<TNumber>(TNumber number)

View File

@ -110,9 +110,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
var sum = result.Sum();
return ConvertSum(sum);
}
#if !EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}");
#endif
#if EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}");
#endif
}
public override async Task<TEnsureResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
@ -198,8 +203,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
return ConvertSum(sum);
}
#if !EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}");
#endif
#if EFCORE2
throw new ShardingCoreException(
$"not support {GetMethodCallExpression()} result {typeof(TEnsureResult)}");
#endif
}
private TEnsureResult ConvertSum<TNumber>(TNumber number)
{