From 9c7789ca29cc231d9aab81bd2bd9d542888424b0 Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Thu, 19 Aug 2021 15:08:02 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=E5=90=8C=E6=AD=A5=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Controllers/ValuesController.cs | 14 ++ .../EFCores/ShardingQueryCompiler.cs | 160 +++++++++++---- .../Abstractions/IStreamMergeEngine.cs | 1 + .../IOrderStreamMergeEnumerator.cs | 16 ++ .../IStreamMergeEnumerator.cs | 3 +- ...ultiAggregateOrderStreamMergeEnumerator.cs | 183 ++++++++++++++++++ .../MultiOrderStreamMergeEnumerator.cs | 110 +++++++++++ .../OrderStreamMergeEnumerator.cs | 110 +++++++++++ .../PaginationStreamMergeEnumerator.cs | 85 ++++++++ .../StreamMergeSync/StreamMergeEnumerator.cs | 63 ++++++ ...nsureMethodCallInMemoryAsyncMergeEngine.cs | 5 +- ...nericMethodCallInMemoryAsyncMergeEngine.cs | 5 +- .../AbstractInMemoryAsyncMergeEngine.cs | 50 +++-- ...ncMergeResult.cs => IEnsureMergeResult.cs} | 13 +- ...cMergeResult.cs => IGenericMergeResult.cs} | 14 +- .../AverageAsyncInMemoryMergeEngine.cs | 144 ++++++++++++-- .../MaxAsyncInMemoryMergeEngine.cs | 8 +- .../MinAsyncInMemoryMergeEngine.cs | 8 +- .../SumAsyncInMemoryMergeEngine.cs | 111 +++++++++-- .../AllAsyncInMemoryMergeEngine.cs | 9 +- .../AnyAsyncInMemoryMergeEngine.cs | 10 +- .../AsyncEnumerableStreamMergeEngine.cs | 59 +++++- .../ContainsAsyncInMemoryMergeEngine.cs | 9 +- .../CountAsyncInMemoryMergeEngine.cs | 10 +- .../FirstAsyncInMemoryMergeEngine.cs | 14 +- .../FirstOrDefaultAsyncInMemoryMergeEngine.cs | 14 +- .../LastAsyncInMemoryMergeEngine.cs | 14 +- .../LastOrDefaultAsyncInMemoryMergeEngine.cs | 14 +- .../LongCountAsyncInMemoryMergeEngine.cs | 10 +- .../SingleAsyncInMemoryMergeEngine.cs | 14 +- ...SingleOrDefaultAsyncInMemoryMergeEngine.cs | 14 +- src/ShardingCore/ShardingCore.csproj | 4 - 32 files changed, 1191 insertions(+), 107 deletions(-) create mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IOrderStreamMergeEnumerator.cs rename src/ShardingCore/Sharding/Enumerators/{ => StreamMergeSync}/IStreamMergeEnumerator.cs (83%) create mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiAggregateOrderStreamMergeEnumerator.cs create mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiOrderStreamMergeEnumerator.cs create mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/OrderStreamMergeEnumerator.cs create mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/PaginationStreamMergeEnumerator.cs create mode 100644 src/ShardingCore/Sharding/Enumerators/StreamMergeSync/StreamMergeEnumerator.cs rename src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/{IEnsureAsyncMergeResult.cs => IEnsureMergeResult.cs} (63%) rename src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/{IGenericAsyncMergeResult.cs => IGenericMergeResult.cs} (58%) diff --git a/samples/Sample.SqlServer/Controllers/ValuesController.cs b/samples/Sample.SqlServer/Controllers/ValuesController.cs index 5056656c..1f9b240a 100644 --- a/samples/Sample.SqlServer/Controllers/ValuesController.cs +++ b/samples/Sample.SqlServer/Controllers/ValuesController.cs @@ -32,8 +32,22 @@ namespace Sample.SqlServer.Controllers var resultx2 = await _defaultTableDbContext.Set().CountAsync(o => o.Age<=10); var resultx = await _defaultTableDbContext.Set().Where(o => o.Id == "198").FirstOrDefaultAsync(); var resultx33 = await _defaultTableDbContext.Set().Where(o => o.Id == "198").Select(o=>o.Id).FirstOrDefaultAsync(); + var resulxxt = await _defaultTableDbContext.Set().Where(o => o.Id == "198").ToListAsync(); var result = await _defaultTableDbContext.Set().ToListAsync(); + + + + var sresultx11231 = _defaultTableDbContext.Set().Where(o => o.Age == 198198).Select(o => o.Id).Contains("1981"); + var sresultx1121 = _defaultTableDbContext.Set().Where(o => o.Id == "198").Sum(o => o.Age); + var sresultx111 = _defaultTableDbContext.Set().FirstOrDefault(o => o.Id == "198"); + var sresultx2 = _defaultTableDbContext.Set().Count(o => o.Age <= 10); + var sresultx = _defaultTableDbContext.Set().Where(o => o.Id == "198").FirstOrDefault(); + var sresultx33 = _defaultTableDbContext.Set().Where(o => o.Id == "198").Select(o => o.Id).FirstOrDefault(); + var sresultxc = _defaultTableDbContext.Set().Where(o => o.Id == "198").Select(o => o.Id).ToList(); + var sresultxasdc = _defaultTableDbContext.Set().Where(o => o.Id == "198").ToList(); + var sresult = _defaultTableDbContext.Set().ToList(); + var sysUserMod98 = result.FirstOrDefault(o => o.Id == "98"); sysUserMod98.Name = "name_update"+new Random().Next(1,99)+"_98"; await _defaultTableDbContext.SaveChangesAsync(); diff --git a/src/ShardingCore/EFCores/ShardingQueryCompiler.cs b/src/ShardingCore/EFCores/ShardingQueryCompiler.cs index f3fff6ab..1c41b1ef 100644 --- a/src/ShardingCore/EFCores/ShardingQueryCompiler.cs +++ b/src/ShardingCore/EFCores/ShardingQueryCompiler.cs @@ -51,34 +51,105 @@ namespace ShardingCore.EFCores { return _currentContext; } + + private TResult EnumerableExecute(IShardingDbContext shardingDbContext, Expression query,bool async) + { + Type queryEntityType ; + if (async) + queryEntityType= typeof(TResult).GetGenericArguments()[0]; + else + { + queryEntityType = query.Type.GetSequenceType(); + } + Type type = typeof(EnumerableQuery<>); + type = type.MakeGenericType(queryEntityType); + var queryable = Activator.CreateInstance(type, query); + + var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create"); + if (streamMergeContextMethod == null) + throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]"); + var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext }); + + + Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>); + streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); + return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext); + } public TResult Execute(Expression query) { - throw new NotImplementedException(); + var async = false; + var currentDbContext = GetCurrentDbContext().Context; + + if (currentDbContext is IShardingDbContext shardingDbContext) + { + //如果根表达式为iqueryable表示需要迭代 + if (query.Type.HasImplementedRawGeneric(typeof(IQueryable<>))) + { + return EnumerableExecute(shardingDbContext, query, async); + } + + if (query is MethodCallExpression methodCallExpression) + { + switch (methodCallExpression.Method.Name) + { + + case nameof(Enumerable.First): + return GenericMergeExecute(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.FirstOrDefault): + return GenericMergeExecute(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.Last): + return GenericMergeExecute(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.LastOrDefault): + return GenericMergeExecute(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.Single): + return GenericMergeExecute(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.SingleOrDefault): + return GenericMergeExecute(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.Count): + return EnsureMergeExecute(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.LongCount): + return EnsureMergeExecute(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.Any): + return EnsureMergeExecute(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.All): + return EnsureMergeExecute(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.Max): + return GenericMergeExecute(typeof(MaxAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.Min): + return EnsureMergeExecute(typeof(MinAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.Sum): + return EnsureMergeExecute2(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.Average): + return EnsureMergeExecute2(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, default); + case nameof(Enumerable.Contains): + return EnsureMergeExecute(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, default); + } + } + + + + throw new ShardingCoreException($"db context operator not support query expression:[{query.Print()}] result type:[{typeof(TResult).FullName}]"); + //IQueryable queryable = new EnumerableQuery(expression); + //var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext); + + //var streamMergeEngine = AsyncEnumerableStreamMergeEngine.Create(streamMergeContext); + //return streamMergeEngine.GetAsyncEnumerator(); + } + + throw new ShardingCoreException("db context operator is not IShardingDbContext"); } public TResult ExecuteAsync(Expression query, CancellationToken cancellationToken) { var currentDbContext = GetCurrentDbContext().Context; + var async = true; if (currentDbContext is IShardingDbContext shardingDbContext) { if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>))) { - var queryEntityType = typeof(TResult).GetGenericArguments()[0]; - Type type = typeof(EnumerableQuery<>); - type = type.MakeGenericType(queryEntityType); - var queryable = Activator.CreateInstance(type, query); - - var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create"); - if (streamMergeContextMethod == null) - throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]"); - var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext }); - - - Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>); - streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); - return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext); + return EnumerableExecute(shardingDbContext, query, async); } @@ -90,35 +161,35 @@ namespace ShardingCore.EFCores { case nameof(Enumerable.First): - return GenericMergeExecuteAsync(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return GenericMergeExecute(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.FirstOrDefault): - return GenericMergeExecuteAsync(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return GenericMergeExecute(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Last): - return GenericMergeExecuteAsync(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return GenericMergeExecute(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.LastOrDefault): - return GenericMergeExecuteAsync(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return GenericMergeExecute(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Single): - return GenericMergeExecuteAsync(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return GenericMergeExecute(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.SingleOrDefault): - return GenericMergeExecuteAsync(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return GenericMergeExecute(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Count): - return EnsureMergeExecuteAsync(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return EnsureMergeExecute(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.LongCount): - return EnsureMergeExecuteAsync(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return EnsureMergeExecute(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Any): - return EnsureMergeExecuteAsync(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return EnsureMergeExecute(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.All): - return EnsureMergeExecuteAsync(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return EnsureMergeExecute(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Max): - return GenericMergeExecuteAsync(typeof(MaxAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return GenericMergeExecute(typeof(MaxAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Min): - return EnsureMergeExecuteAsync(typeof(MinAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return EnsureMergeExecute(typeof(MinAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Sum): - return EnsureMergeExecuteAsync2(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, cancellationToken); + return EnsureMergeExecute2(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Average): - return EnsureMergeExecuteAsync2(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, cancellationToken); + return EnsureMergeExecute2(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken); case nameof(Enumerable.Contains): - return EnsureMergeExecuteAsync(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); + return EnsureMergeExecute(typeof(ContainsAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); } } @@ -138,7 +209,7 @@ namespace ShardingCore.EFCores - private TResult GenericMergeExecuteAsync(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken) + private TResult GenericMergeExecute(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) { //Type type = typeof(EnumerableQuery<>); @@ -152,14 +223,15 @@ namespace ShardingCore.EFCores var queryEntityType = query.GetQueryEntityType(); streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); - var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IGenericAsyncMergeResult.MergeResultAsync)); + var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult)); if (streamEngineMethod == null) throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); - return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, new object[] { cancellationToken }); + var @params = async ? new object[] { cancellationToken } : new object[0]; + return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, @params); } - private TResult EnsureMergeExecuteAsync(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken) + private TResult EnsureMergeExecute(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) { //Type type = typeof(EnumerableQuery<>); @@ -173,13 +245,14 @@ namespace ShardingCore.EFCores streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType()); var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); - var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureAsyncMergeResult.MergeResultAsync)); + var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult.MergeResultAsync) : nameof(IEnsureMergeResult.MergeResult)); if (streamEngineMethod == null) throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); - return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken }); + var @params = async ? new object[] { cancellationToken } : new object[0]; + return (TResult)streamEngineMethod.Invoke(streamEngine, @params); } - private TResult EnsureMergeExecuteAsync2(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken) + private TResult EnsureMergeExecute2(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) { //Type type = typeof(EnumerableQuery<>); @@ -190,15 +263,18 @@ namespace ShardingCore.EFCores //if (streamMergeContextMethod == null) // throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]"); //var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext }); - - streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0]); + if (async) + streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0]); + else + streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult)); var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); - var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureAsyncMergeResult.MergeResultAsync)); + var streamEngineMethod = streamMergeEngineType.GetMethod(async ? nameof(IEnsureMergeResult.MergeResultAsync) : nameof(IEnsureMergeResult.MergeResult)); if (streamEngineMethod == null) throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); - return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken }); + var @params = async ? new object[] { cancellationToken } : new object[0]; + return (TResult)streamEngineMethod.Invoke(streamEngine, @params); } - //private TResult EnsureMergeExecuteAsync(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken) + //private TResult EnsureMergeExecute(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken) //{ // //Type type = typeof(EnumerableQuery<>); diff --git a/src/ShardingCore/Sharding/Abstractions/IStreamMergeEngine.cs b/src/ShardingCore/Sharding/Abstractions/IStreamMergeEngine.cs index 95eed29a..be78fbfd 100644 --- a/src/ShardingCore/Sharding/Abstractions/IStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/Abstractions/IStreamMergeEngine.cs @@ -1,6 +1,7 @@ using System; using System.Threading.Tasks; using ShardingCore.Sharding.Enumerators; +using ShardingCore.Sharding.Enumerators.StreamMergeSync; namespace ShardingCore.Sharding.Abstractions { diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IOrderStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IOrderStreamMergeEnumerator.cs new file mode 100644 index 00000000..50ef08b0 --- /dev/null +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IOrderStreamMergeEnumerator.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; + +namespace ShardingCore.Sharding.Enumerators.StreamMergeSync +{ +/* +* @Author: xjm +* @Description: +* @Date: Sunday, 15 August 2021 06:44:33 +* @Email: 326308290@qq.com +*/ + public interface IOrderStreamMergeEnumerator:IStreamMergeEnumerator, IComparable> + { + List GetCompares(); + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/IStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IStreamMergeEnumerator.cs similarity index 83% rename from src/ShardingCore/Sharding/Enumerators/IStreamMergeEnumerator.cs rename to src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IStreamMergeEnumerator.cs index 1c310fad..0149a001 100644 --- a/src/ShardingCore/Sharding/Enumerators/IStreamMergeEnumerator.cs +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/IStreamMergeEnumerator.cs @@ -1,7 +1,6 @@ -using System; using System.Collections.Generic; -namespace ShardingCore.Sharding.Enumerators +namespace ShardingCore.Sharding.Enumerators.StreamMergeSync { /* * @Author: xjm diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiAggregateOrderStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiAggregateOrderStreamMergeEnumerator.cs new file mode 100644 index 00000000..9c697281 --- /dev/null +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiAggregateOrderStreamMergeEnumerator.cs @@ -0,0 +1,183 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using ShardingCore.Core.Internal.PriorityQueues; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Enumerators.AggregateExtensions; + +namespace ShardingCore.Sharding.Enumerators.StreamMergeSync +{ + /* + * @Author: xjm + * @Description: + * @Date: Sunday, 15 August 2021 06:43:26 + * @Email: 326308290@qq.com + */ + public class MultiAggregateOrderStreamMergeEnumerator : IStreamMergeEnumerator + { + + private readonly StreamMergeContext _mergeContext; + private readonly IEnumerable> _enumerators; + private readonly PriorityQueue> _queue; + private T CurrentValue; + private List CurrentGroupValues; + private bool _skipFirst; + + public MultiAggregateOrderStreamMergeEnumerator(StreamMergeContext mergeContext, IEnumerable> enumerators) + { + _mergeContext = mergeContext; + _enumerators = enumerators; + _queue = new PriorityQueue>(enumerators.Count()); + _skipFirst = true; + SetOrderEnumerator(); + } + + private void SetOrderEnumerator() + { + foreach (var source in _enumerators) + { + var orderStreamEnumerator = new OrderStreamMergeEnumerator(_mergeContext, source); + if (orderStreamEnumerator.HasElement()) + { + orderStreamEnumerator.SkipFirst(); + _queue.Offer(orderStreamEnumerator); + } + } + //设置第一个元素聚合的属性值 + CurrentGroupValues = _queue.IsEmpty() ? new List(0) : GetCurrentGroupValues(_queue.Peek()); + } + + private List GetCurrentGroupValues(IOrderStreamMergeEnumerator enumerator) + { + var first = enumerator.ReallyCurrent; + return _mergeContext.SelectContext.SelectProperties.Where(o => !o.IsAggregateMethod) + .Select(o => first.GetValueByExpression(o.PropertyName)).ToList(); + } + public bool MoveNext() + { + if (_queue.IsEmpty()) + return false; + var hasNext = SetCurrentValue(); + if (hasNext) + { + CurrentGroupValues = _queue.IsEmpty() ? new List(0) : GetCurrentGroupValues(_queue.Peek()); + } + return hasNext; + } + + public void Reset() + { + throw new NotImplementedException(); + } + + object IEnumerator.Current => Current; + + private bool EqualWithGroupValues() + { + var current = GetCurrentGroupValues(_queue.Peek()); + for (int i = 0; i < CurrentGroupValues.Count; i++) + { + if (!CurrentGroupValues[i].Equals(current[i])) + return false; + } + + return true; + } + private bool SetCurrentValue() + { + CurrentValue = default; + var currentValues = new List(); + while (EqualWithGroupValues()) + { + var current = _queue.Peek().Current; + currentValues.Add(current); + var first = _queue.Poll(); + + if (first.MoveNext()) + { + _queue.Offer(first); + } + + if (_queue.IsEmpty()) + { + break; + } + } + + MergeValue(currentValues); + + return true; + } + + private void MergeValue(List aggregateValues) + { + + if (aggregateValues.IsNotEmpty()) + { + CurrentValue = aggregateValues.First(); + if (aggregateValues.Count > 1) + { + var aggregates = _mergeContext.SelectContext.SelectProperties.Where(o => o.IsAggregateMethod).ToList(); + if (aggregates.IsNotEmpty()) + { + foreach (var aggregate in aggregates) + { + object aggregateValue = null; + if (aggregate.AggregateMethod == nameof(Queryable.Count)) + { + aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName); + } + else if (aggregate.AggregateMethod == nameof(Queryable.Sum)) + { + aggregateValue = aggregateValues.AsQueryable().Sum(aggregate.PropertyName); + } + else if (aggregate.AggregateMethod == nameof(Queryable.Max)) + { + aggregateValue = aggregateValues.AsQueryable().Max(aggregate.PropertyName); + } + else if (aggregate.AggregateMethod == nameof(Queryable.Min)) + { + aggregateValue = aggregateValues.AsQueryable().Min(aggregate.PropertyName); + } + else if (aggregate.AggregateMethod == nameof(Queryable.Average)) + { + aggregateValue = aggregateValues.AsQueryable().Average(aggregate.PropertyName); + } + else + { + throw new InvalidOperationException($"method:{aggregate.AggregateMethod} invalid operation "); + } + CurrentValue.SetPropertyValue(aggregate.PropertyName, aggregateValue); + } + } + } + } + } + + + public bool SkipFirst() + { + return true; + } + + public bool HasElement() + { + return ReallyCurrent != null; + } + + public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent; + + + public void Dispose() + { + foreach (var enumerator in _enumerators) + { + enumerator?.Dispose(); + } + } + + public T Current => CurrentValue; + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiOrderStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiOrderStreamMergeEnumerator.cs new file mode 100644 index 00000000..bf6445f4 --- /dev/null +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/MultiOrderStreamMergeEnumerator.cs @@ -0,0 +1,110 @@ +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using ShardingCore.Core.Internal.PriorityQueues; + +namespace ShardingCore.Sharding.Enumerators.StreamMergeSync +{ + /* + * @Author: xjm + * @Description: + * @Date: Sunday, 15 August 2021 06:49:09 + * @Email: 326308290@qq.com + */ + public class MultiOrderStreamMergeEnumerator : IStreamMergeEnumerator + { + + private readonly StreamMergeContext _mergeContext; + private readonly IEnumerable> _enumerators; + private readonly PriorityQueue> _queue; + private IStreamMergeEnumerator _currentEnumerator; + private bool skipFirst; + + public MultiOrderStreamMergeEnumerator(StreamMergeContext mergeContext, IEnumerable> enumerators) + { + _mergeContext = mergeContext; + _enumerators = enumerators; + _queue = new PriorityQueue>(enumerators.Count()); + skipFirst = true; + SetOrderEnumerator(); + } + + private void SetOrderEnumerator() + { + foreach (var source in _enumerators) + { + var orderStreamEnumerator = new OrderStreamMergeEnumerator(_mergeContext, source); + if (orderStreamEnumerator.HasElement()) + { + orderStreamEnumerator.SkipFirst(); + _queue.Offer(orderStreamEnumerator); + } + } + + _currentEnumerator = _queue.IsEmpty() ? _enumerators.FirstOrDefault() : _queue.Peek(); + } + public bool MoveNext() + { + if (_queue.IsEmpty()) + return false; + if (skipFirst) + { + skipFirst = false; + return true; + } + + var first = _queue.Poll(); + + if (first.MoveNext()) + { + _queue.Offer(first); + } + + if (_queue.IsEmpty()) + { + return false; + } + + _currentEnumerator = _queue.Peek(); + return true; + } + + public void Reset() + { + throw new System.NotImplementedException(); + } + + object IEnumerator.Current => Current; + + + public bool SkipFirst() + { + if (skipFirst) + { + skipFirst = false; + return true; + } + return false; + } + + public bool HasElement() + { + return _currentEnumerator != null && _currentEnumerator.HasElement(); + } + + public T ReallyCurrent => _queue.IsEmpty() ? default(T) : _queue.Peek().ReallyCurrent; + + + public void Dispose() + { + foreach (var enumerator in _enumerators) + { + enumerator?.Dispose(); + } + } + + + public T Current => skipFirst ? default : _currentEnumerator.Current; + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/OrderStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/OrderStreamMergeEnumerator.cs new file mode 100644 index 00000000..0dc1b799 --- /dev/null +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/OrderStreamMergeEnumerator.cs @@ -0,0 +1,110 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using ShardingCore.Core.Internal.StreamMerge; +using ShardingCore.Extensions; + +namespace ShardingCore.Sharding.Enumerators.StreamMergeSync +{ +/* +* @Author: xjm +* @Description: +* @Date: Sunday, 15 August 2021 06:46:32 +* @Email: 326308290@qq.com +*/ + public class OrderStreamMergeEnumerator:IOrderStreamMergeEnumerator + { + + /// + /// 合并数据上下文 + /// + private readonly StreamMergeContext _mergeContext; + + private readonly IStreamMergeEnumerator _enumerator; + private List _orderValues; + + public OrderStreamMergeEnumerator(StreamMergeContext mergeContext, IStreamMergeEnumerator enumerator) + { + _mergeContext = mergeContext; + _enumerator = enumerator; + SetOrderValues(); + } + + private void SetOrderValues() + { + _orderValues = HasElement() ? GetCurrentOrderValues() : new List(0); + } + + + public bool MoveNext() + { + var has = _enumerator.MoveNext(); + SetOrderValues(); + return has; + } + + public void Reset() + { + throw new NotImplementedException(); + } + + object IEnumerator.Current => Current; + + + public T Current =>_enumerator.Current; + + public bool SkipFirst() + { + return _enumerator.SkipFirst(); + } + + public bool HasElement() + { + return _enumerator.HasElement(); + } + + public T ReallyCurrent => _enumerator.ReallyCurrent; + + private List GetCurrentOrderValues() + { + if (!_mergeContext.Orders.Any()) + return new List(0); + var list = new List(_mergeContext.Orders.Count()); + foreach (var order in _mergeContext.Orders) + { + var value = _enumerator.ReallyCurrent.GetValueByExpression(order.PropertyExpression); + if (value is IComparable comparable) + list.Add(comparable); + else + throw new NotSupportedException($"order by value [{order}] must implements IComparable"); + } + + return list; + } + + public int CompareTo(IOrderStreamMergeEnumerator other) + { + int i = 0; + foreach (var order in _mergeContext.Orders) { + int result = CompareHelper.CompareToWith(_orderValues[i], other.GetCompares()[i], order.IsAsc); + if (0 != result) { + return result; + } + i++; + } + return 0; + } + + public List GetCompares() + { + return _orderValues ?? new List(0); + } + + public void Dispose() + { + _enumerator?.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/PaginationStreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/PaginationStreamMergeEnumerator.cs new file mode 100644 index 00000000..a5274044 --- /dev/null +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/PaginationStreamMergeEnumerator.cs @@ -0,0 +1,85 @@ +using System.Collections; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace ShardingCore.Sharding.Enumerators.StreamMergeSync +{ + /* + * @Author: xjm + * @Description: + * @Date: Sunday, 15 August 2021 06:39:52 + * @Email: 326308290@qq.com + */ + public class PaginationStreamMergeEnumerator : IStreamMergeEnumerator + { + private readonly StreamMergeContext _mergeContext; + private readonly IStreamMergeEnumerator _enumerator; + private readonly int? _skip; + private readonly int? _take; + private int realSkip = 0; + private int realTake = 0; + + public PaginationStreamMergeEnumerator(StreamMergeContext mergeContext, IEnumerable> sources) + { + _mergeContext = mergeContext; + _skip = mergeContext.Skip; + _take = mergeContext.Take; + if (_mergeContext.HasGroupQuery()) + _enumerator = new MultiAggregateOrderStreamMergeEnumerator(_mergeContext, sources); + else + _enumerator = new MultiOrderStreamMergeEnumerator(_mergeContext, sources); + } + + public bool MoveNext() + { + //如果合并数据的时候不需要跳过也没有take多少那么就是直接next + while (_skip.GetValueOrDefault() > this.realSkip) + { + var has = _enumerator.MoveNext(); + + realSkip++; + if (!has) + return false; + } + + var next = _enumerator.MoveNext(); + + if (next) + { + if (_take.HasValue) + { + realTake++; + if (realTake >= _take.Value) + return false; + } + } + + return next; + } + + public void Reset() + { + throw new System.NotImplementedException(); + } + + object IEnumerator.Current => Current; + + public T Current => _enumerator.Current; + public bool SkipFirst() + { + return _enumerator.SkipFirst(); + } + + public bool HasElement() + { + return _enumerator.HasElement(); + } + + public T ReallyCurrent => _enumerator.ReallyCurrent; + + public void Dispose() + { + _enumerator?.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/StreamMergeEnumerator.cs b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/StreamMergeEnumerator.cs new file mode 100644 index 00000000..bb5c9596 --- /dev/null +++ b/src/ShardingCore/Sharding/Enumerators/StreamMergeSync/StreamMergeEnumerator.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections; +using System.Collections.Generic; + +namespace ShardingCore.Sharding.Enumerators.StreamMergeSync +{ +/* +* @Author: xjm +* @Description: +* @Date: Saturday, 14 August 2021 21:25:50 +* @Email: 326308290@qq.com +*/ + public class StreamMergeEnumerator:IStreamMergeEnumerator + { + private readonly IEnumerator _source; + private bool skip; + + public StreamMergeEnumerator(IEnumerator source) + { + _source = source; + skip = true; + } + + public bool MoveNext() + { + if (skip) + { + skip = false; + return null != _source.Current; + } + return _source.MoveNext(); + } + + public void Reset() + { + throw new NotImplementedException(); + } + + object IEnumerator.Current => Current; + + public T Current => skip?default:_source.Current; + public bool SkipFirst() + { + if (skip) + { + skip = false; + return true; + } + return false; + } + + public bool HasElement() + { + return null != _source.Current; + } + + public T ReallyCurrent => _source.Current; + public void Dispose() + { + _source?.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureMethodCallInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureMethodCallInMemoryAsyncMergeEngine.cs index d8b53a3d..dee0a25c 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureMethodCallInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractEnsureMethodCallInMemoryAsyncMergeEngine.cs @@ -16,11 +16,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions * @Ver: 1.0 * @Email: 326308290@qq.com */ - public abstract class AbstractEnsureMethodCallInMemoryAsyncMergeEngine : AbstractInMemoryAsyncMergeEngine,IEnsureAsyncMergeResult + public abstract class AbstractEnsureMethodCallInMemoryAsyncMergeEngine : AbstractInMemoryAsyncMergeEngine,IEnsureMergeResult { protected AbstractEnsureMethodCallInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { } + + public abstract TResult MergeResult(); + public abstract Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()); } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericMethodCallInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericMethodCallInMemoryAsyncMergeEngine.cs index 93fb089a..bac73fa1 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericMethodCallInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractGenericMethodCallInMemoryAsyncMergeEngine.cs @@ -16,13 +16,16 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions * @Ver: 1.0 * @Email: 326308290@qq.com */ - public abstract class AbstractGenericMethodCallInMemoryAsyncMergeEngine : AbstractInMemoryAsyncMergeEngine, IGenericAsyncMergeResult + public abstract class AbstractGenericMethodCallInMemoryAsyncMergeEngine : AbstractInMemoryAsyncMergeEngine, IGenericMergeResult { protected AbstractGenericMethodCallInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { } + + public abstract TResult MergeResult(); + public abstract Task MergeResultAsync( CancellationToken cancellationToken = new CancellationToken()); } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs index db10fef6..d636b0f7 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/AbstractInMemoryAsyncMergeEngine.cs @@ -5,8 +5,10 @@ using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore.Infrastructure; +using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine; using ShardingCore.Exceptions; using ShardingCore.Extensions; +using ShardingCore.Helpers; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Enumerators; @@ -53,28 +55,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions var tableResult = GetStreamMergeContext().GetRouteResults(); var enumeratorTasks = tableResult.Select(routeResult => { - if (routeResult.ReplaceTables.Count > 1) - throw new ShardingCoreException("route found more than 1 table name s"); - var tail = string.Empty; - if (routeResult.ReplaceTables.Count == 1) - tail = routeResult.ReplaceTables.First().Tail; + var tail = CheckAndGetTail(routeResult); return Task.Run(async () => { try { - //using (var scope = _mergeContext.CreateScope()) - //{ - //var shardingContext = ShardingContext.Create(routeResult); - //scope.ShardingAccessor.ShardingContext = shardingContext; - var shardingDbContext = GetStreamMergeContext().CreateDbContext(tail); var newQueryable = (IQueryable)GetStreamMergeContext().GetReWriteQueryable() .ReplaceDbContextQueryable(shardingDbContext); var newFilterQueryable=EFQueryAfterFilter(newQueryable); var query = await efQuery(newFilterQueryable); return query; - //} } catch (Exception e) { @@ -86,6 +78,42 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions return (await Task.WhenAll(enumeratorTasks)).ToList(); } + public List Execute(Func efQuery, CancellationToken cancellationToken = new CancellationToken()) + { + var tableResult = GetStreamMergeContext().GetRouteResults(); + var enumeratorTasks = tableResult.Select(routeResult => + { + var tail = CheckAndGetTail(routeResult); + + return Task.Run( () => + { + try + { + var shardingDbContext = GetStreamMergeContext().CreateDbContext(tail); + var newQueryable = (IQueryable)GetStreamMergeContext().GetReWriteQueryable() + .ReplaceDbContextQueryable(shardingDbContext); + var newFilterQueryable = EFQueryAfterFilter(newQueryable); + var query = efQuery(newFilterQueryable); + return query; + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } + }); + }).ToArray(); + return Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult().ToList(); + } + private string CheckAndGetTail(RouteResult routeResult) + { + if (routeResult.ReplaceTables.Count > 1) + throw new ShardingCoreException("route found more than 1 table name s"); + var tail = string.Empty; + if (routeResult.ReplaceTables.Count == 1) + tail = routeResult.ReplaceTables.First().Tail; + return tail; + } public virtual IQueryable EFQueryAfterFilter(IQueryable queryable) { diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IEnsureAsyncMergeResult.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IEnsureMergeResult.cs similarity index 63% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IEnsureAsyncMergeResult.cs rename to src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IEnsureMergeResult.cs index 73483e6c..c069dc3f 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IEnsureAsyncMergeResult.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IEnsureMergeResult.cs @@ -13,10 +13,19 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions * @Ver: 1.0 * @Email: 326308290@qq.com */ - public interface IEnsureAsyncMergeResult + /// + /// 确认结果的合并 + /// + /// 返回的确认结果类型 + public interface IEnsureMergeResult { + /// + /// 合并结果 + /// + /// + T MergeResult(); /// - /// + /// 合并结果 /// /// /// diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IGenericAsyncMergeResult.cs b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IGenericMergeResult.cs similarity index 58% rename from src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IGenericAsyncMergeResult.cs rename to src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IGenericMergeResult.cs index 1737d3a0..1eb1a928 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IGenericAsyncMergeResult.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/Abstractions/IGenericMergeResult.cs @@ -13,11 +13,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions * @Ver: 1.0 * @Email: 326308290@qq.com */ - public interface IGenericAsyncMergeResult + /// + /// 非确认结果的合并 + /// + public interface IGenericMergeResult { /// - /// + /// 合并结果 /// + /// 结果类型 + /// + TResult MergeResult(); + /// + /// 合并结果 + /// + /// 结果类型 /// /// Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs index 14f2823f..176c2009 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs @@ -30,46 +30,158 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines { } + public override TEnsureResult MergeResult() + { + if (typeof(decimal) == typeof(TEnsureResult)) + { + var result = base.Execute(queryable => ((IQueryable)queryable).Average()); + if (result.IsEmpty()) + return default; + var average = result.Sum() / result.Count; + return ConvertSum(average); + } + + if (typeof(decimal?) == typeof(TEnsureResult)) + { + var result = base.Execute( + queryable => ((IQueryable)queryable).Average() + ); + if (result.IsEmpty()) + return default; + var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + return ConvertSum(average); + } + + if (typeof(int) == typeof(TEnsureResult)) + { + var result = base.Execute( + queryable => ((IQueryable)queryable).Average() + ); + if (result.IsEmpty()) + return default; + var average = result.Sum() / result.Count; + return ConvertSum(average); + } + + if (typeof(int?) == typeof(TEnsureResult)) + { + var result = base.Execute( + queryable => ((IQueryable)queryable).Average() + ); + if (result.IsEmpty()) + return default; + var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + return ConvertSum(average); + } + + if (typeof(long) == typeof(TEnsureResult)) + { + var result = base.Execute( + queryable => ((IQueryable)queryable).Average() + ); + if (result.IsEmpty()) + return default; + var average = result.Sum() / result.Count; + return ConvertSum(average); + } + + if (typeof(long?) == typeof(TEnsureResult)) + { + var result = base.Execute( + queryable => ((IQueryable)queryable).Average() + ); + if (result.IsEmpty()) + return default; + var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + return ConvertSum(average); + } + + if (typeof(double) == typeof(TEnsureResult)) + { + var result = base.Execute( + queryable => ((IQueryable)queryable).Average() + ); + var average = result.Sum() / result.Count; + return ConvertSum(average); + } + + if (typeof(double?) == typeof(TEnsureResult)) + { + var result = base.Execute( + queryable => ((IQueryable)queryable).Average() + ); + if (result.IsEmpty()) + return default; + var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + return ConvertSum(average); + } + + if (typeof(float) == typeof(TEnsureResult)) + { + var result = base.Execute( + queryable => ((IQueryable)queryable).Average() + ); + if (result.IsEmpty()) + return default; + var average = result.Sum() / result.Count; + return ConvertSum(average); + } + + if (typeof(float?) == typeof(TEnsureResult)) + { + var result = base.Execute( + queryable => ((IQueryable)queryable).Average() + ); + if (result.IsEmpty()) + return default; + var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + return ConvertSum(average); + } + + throw new ShardingCoreException( + $"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}"); + } + public override async Task MergeResultAsync( CancellationToken cancellationToken = new CancellationToken()) { if (typeof(decimal) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync( - async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + queryable => ((IQueryable)queryable).AverageAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum()/result.Count; + var average = result.Sum() / result.Count; return ConvertSum(average); } if (typeof(decimal?) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync( - async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + queryable => ((IQueryable)queryable).AverageAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum().HasValue ?result.Sum()/result.Count: default; + var average = result.Sum().HasValue ? result.Sum() / result.Count : default; return ConvertSum(average); } if (typeof(int) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync( - async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + queryable => ((IQueryable)queryable).AverageAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum()/result.Count; + var average = result.Sum() / result.Count; return ConvertSum(average); } if (typeof(int?) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync( - async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + queryable => ((IQueryable)queryable).AverageAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; @@ -80,18 +192,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines if (typeof(long) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync( - async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + queryable => ((IQueryable)queryable).AverageAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum()/result.Count; + var average = result.Sum() / result.Count; return ConvertSum(average); } if (typeof(long?) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync( - async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + queryable => ((IQueryable)queryable).AverageAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; @@ -102,16 +214,16 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines if (typeof(double) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync( - async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + queryable => ((IQueryable)queryable).AverageAsync(cancellationToken), cancellationToken); - var average = result.Sum()/result.Count; + var average = result.Sum() / result.Count; return ConvertSum(average); } if (typeof(double?) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync( - async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + queryable => ((IQueryable)queryable).AverageAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; @@ -122,18 +234,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines if (typeof(float) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync( - async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + queryable => ((IQueryable)queryable).AverageAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; - var average = result.Sum()/result.Count; + var average = result.Sum() / result.Count; return ConvertSum(average); } if (typeof(float?) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync( - async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + queryable => ((IQueryable)queryable).AverageAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs index a066b0fc..5acd1230 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MaxAsyncInMemoryMergeEngine.cs @@ -25,9 +25,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines { } + public override TResult MergeResult() + { + var result = base.Execute( queryable => ((IQueryable)queryable).Max()); + return result.Max(); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).MaxAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).MaxAsync(cancellationToken), cancellationToken); return result.Max(); } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs index ac4a18f5..647139ee 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/MinAsyncInMemoryMergeEngine.cs @@ -26,9 +26,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines { } + public override TResult MergeResult() + { + var result = base.Execute( queryable => ((IQueryable)queryable).Min()); + return result.Min(); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).MinAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).MinAsync(cancellationToken), cancellationToken); return result.Min(); } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs index 26a31f42..896d7ba2 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs @@ -28,11 +28,11 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines { } - public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + public override TEnsureResult MergeResult() { - if(typeof(decimal)==typeof(TEnsureResult)) + if (typeof(decimal) == typeof(TEnsureResult)) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); if (result.IsEmpty()) return default; var sum = result.Sum(); @@ -40,7 +40,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines } if (typeof(decimal?) == typeof(TEnsureResult)) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); if (result.IsEmpty()) return default; var sum = result.Sum(); @@ -48,7 +48,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines } if (typeof(int) == typeof(TEnsureResult)) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); if (result.IsEmpty()) return default; var sum = result.Sum(); @@ -56,7 +56,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines } if (typeof(int?) == typeof(TEnsureResult)) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); if (result.IsEmpty()) return default; var sum = result.Sum(); @@ -64,7 +64,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines } if (typeof(long) == typeof(TEnsureResult)) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); if (result.IsEmpty()) return default; var sum = result.Sum(); @@ -72,7 +72,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines } if (typeof(long?) == typeof(TEnsureResult)) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); if (result.IsEmpty()) return default; var sum = result.Sum(); @@ -80,7 +80,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines } if (typeof(double) == typeof(TEnsureResult)) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); if (result.IsEmpty()) return default; var sum = result.Sum(); @@ -88,7 +88,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines } if (typeof(double?) == typeof(TEnsureResult)) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); if (result.IsEmpty()) return default; var sum = result.Sum(); @@ -96,7 +96,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines } if (typeof(float) == typeof(TEnsureResult)) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); if (result.IsEmpty()) return default; var sum = result.Sum(); @@ -104,7 +104,94 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines } if (typeof(float?) == typeof(TEnsureResult)) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + var result = base.Execute(queryable => ((IQueryable)queryable).Sum()); + if (result.IsEmpty()) + return default; + var sum = result.Sum(); + return ConvertSum(sum); + } + + throw new ShardingCoreException( + $"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}"); + } + + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) + { + if (typeof(decimal) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; + var sum = result.Sum(); + return ConvertSum(sum); + } + if (typeof(decimal?) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; + var sum = result.Sum(); + return ConvertSum(sum); + } + if (typeof(int) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; + var sum = result.Sum(); + return ConvertSum(sum); + } + if (typeof(int?) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; + var sum = result.Sum(); + return ConvertSum(sum); + } + if (typeof(long) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; + var sum = result.Sum(); + return ConvertSum(sum); + } + if (typeof(long?) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; + var sum = result.Sum(); + return ConvertSum(sum); + } + if (typeof(double) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; + var sum = result.Sum(); + return ConvertSum(sum); + } + if (typeof(double?) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; + var sum = result.Sum(); + return ConvertSum(sum); + } + if (typeof(float) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; + var sum = result.Sum(); + return ConvertSum(sum); + } + if (typeof(float?) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync(queryable => ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); if (result.IsEmpty()) return default; var sum = result.Sum(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs index f50f9401..20c7c9b2 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AllAsyncInMemoryMergeEngine.cs @@ -27,10 +27,17 @@ namespace ShardingCore.Sharding.StreamMergeEngines { } + public override bool MergeResult() + { + var result = base.Execute( queryable => ((IQueryable)queryable).Any()); + + return result.All(o => o); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).AnyAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).AnyAsync(cancellationToken), cancellationToken); return result.All(o => o); } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AnyAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AnyAsyncInMemoryMergeEngine.cs index 71a2214e..7205749e 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AnyAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AnyAsyncInMemoryMergeEngine.cs @@ -26,9 +26,17 @@ namespace ShardingCore.Sharding.StreamMergeEngines public AnyAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { } + + public override bool MergeResult() + { + var result = base.Execute( queryable => ((IQueryable)queryable).Any()); + + return result.Any(o => o); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).AnyAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).AnyAsync(cancellationToken), cancellationToken); return result.Any(o => o); } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs index aef6cb6d..96672780 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AsyncEnumerableStreamMergeEngine.cs @@ -1,4 +1,5 @@ using System; +using System.Collections; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -8,6 +9,7 @@ using ShardingCore.Exceptions; using ShardingCore.Extensions; using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators.StreamMergeAsync; +using ShardingCore.Sharding.Enumerators.StreamMergeSync; namespace ShardingCore.Sharding.StreamMergeEngines { @@ -17,7 +19,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines * @Date: Saturday, 14 August 2021 22:07:28 * @Email: 326308290@qq.com */ - public class AsyncEnumerableStreamMergeEngine :IAsyncEnumerable + public class AsyncEnumerableStreamMergeEngine :IAsyncEnumerable,IEnumerable { private readonly StreamMergeContext _mergeContext; @@ -131,5 +133,60 @@ namespace ShardingCore.Sharding.StreamMergeEngines return new MultiAggregateOrderStreamMergeAsyncEnumerator(_mergeContext, streamEnumerators); return new MultiOrderStreamMergeAsyncEnumerator(_mergeContext, streamEnumerators); } + + private IEnumerator GetEnumerator(IQueryable newQueryable) + { + var enumator = newQueryable.AsEnumerable().GetEnumerator(); + enumator.MoveNext(); + return enumator; + } + public IEnumerator GetEnumerator() + { + var tableResult = _mergeContext.GetRouteResults(); + var enumeratorTasks = tableResult.Select(routeResult => + { + if (routeResult.ReplaceTables.Count > 1) + throw new ShardingCoreException("route found more than 1 table name s"); + var tail = string.Empty; + if (routeResult.ReplaceTables.Count == 1) + tail = routeResult.ReplaceTables.First().Tail; + + return Task.Run( () => + { + try + { + //using (var scope = _mergeContext.CreateScope()) + //{ + //var shardingContext = ShardingContext.Create(routeResult); + //scope.ShardingAccessor.ShardingContext = shardingContext; + + var shardingDbContext = _mergeContext.CreateDbContext(tail); + var newQueryable = (IQueryable)_mergeContext.GetReWriteQueryable() + .ReplaceDbContextQueryable(shardingDbContext); + + var enumerator = GetEnumerator(newQueryable); + return new StreamMergeEnumerator(enumerator); + //} + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } + }); + }).ToArray(); + + var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult(); + if (_mergeContext.HasSkipTake()) + return new PaginationStreamMergeEnumerator(_mergeContext, streamEnumerators); + if (_mergeContext.HasGroupQuery()) + return new MultiAggregateOrderStreamMergeEnumerator(_mergeContext, streamEnumerators); + return new MultiOrderStreamMergeEnumerator(_mergeContext, streamEnumerators); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } } } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/ContainsAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/ContainsAsyncInMemoryMergeEngine.cs index cfd519d6..ae7fb0ff 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/ContainsAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/ContainsAsyncInMemoryMergeEngine.cs @@ -23,9 +23,16 @@ namespace ShardingCore.Sharding.StreamMergeEngines { } + public override bool MergeResult() + { + var result = base.Execute( queryable => ((IQueryable)queryable).Contains(GetConstantItem())); + + return result.Any(o => o); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).ContainsAsync(GetConstantItem(), cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).ContainsAsync(GetConstantItem(), cancellationToken), cancellationToken); return result.Any(o => o); } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs index c922d69e..e938f766 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/CountAsyncInMemoryMergeEngine.cs @@ -21,9 +21,17 @@ namespace ShardingCore.Sharding.StreamMergeEngines public CountAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) { } + + public override int MergeResult() + { + var result = base.Execute( queryable => ((IQueryable)queryable).Count()); + + return result.Sum(); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).CountAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).CountAsync(cancellationToken), cancellationToken); return result.Sum(); } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs index a6e126be..a7ecadaa 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/FirstAsyncInMemoryMergeEngine.cs @@ -26,9 +26,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines { } + public override TResult MergeResult() + { + var result = base.Execute(queryable => ((IQueryable)queryable).First()); + var q = result.Where(o => o != null).AsQueryable(); + + var streamMergeContext = GetStreamMergeContext(); + if (streamMergeContext.Orders.Any()) + return q.OrderWithExpression(streamMergeContext.Orders).First(); + + return q.First(); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).FirstAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).FirstAsync(cancellationToken), cancellationToken); var q = result.Where(o => o != null).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs index 44ff5beb..a2f7947b 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/FirstOrDefaultAsyncInMemoryMergeEngine.cs @@ -29,9 +29,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines { } + public override TResult MergeResult() + { + var result = base.Execute( queryable => ((IQueryable)queryable).FirstOrDefault()); + var q = result.Where(o => o != null).AsQueryable(); + + var streamMergeContext = GetStreamMergeContext(); + if (streamMergeContext.Orders.Any()) + return q.OrderWithExpression(streamMergeContext.Orders).FirstOrDefault(); + + return q.FirstOrDefault(); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken); var q = result.Where(o => o != null).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs index 3494e177..e0b6e6be 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/LastAsyncInMemoryMergeEngine.cs @@ -26,10 +26,22 @@ namespace ShardingCore.Sharding.StreamMergeEngines { } + public override TResult MergeResult() + { + var result = base.Execute( queryable => ((IQueryable)queryable).Last()); + var q = result.Where(o => o != null).AsQueryable(); + + var streamMergeContext = GetStreamMergeContext(); + if (streamMergeContext.Orders.Any()) + return q.OrderWithExpression(streamMergeContext.Orders).Last(); + + return q.Last(); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).LastAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).LastAsync(cancellationToken), cancellationToken); var q = result.Where(o => o != null).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs index 1c61b182..97dcf284 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/LastOrDefaultAsyncInMemoryMergeEngine.cs @@ -26,9 +26,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines { } + public override TResult MergeResult() + { + var result = base.Execute( queryable => ((IQueryable)queryable).LastOrDefault()); + var q = result.Where(o => o != null).AsQueryable(); + + var streamMergeContext = GetStreamMergeContext(); + if (streamMergeContext.Orders.Any()) + return q.OrderWithExpression(streamMergeContext.Orders).LastOrDefault(); + + return q.LastOrDefault(); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).LastOrDefaultAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).LastOrDefaultAsync(cancellationToken), cancellationToken); var q = result.Where(o => o != null).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/LongCountAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/LongCountAsyncInMemoryMergeEngine.cs index 59f50f6e..f6c1f5a0 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/LongCountAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/LongCountAsyncInMemoryMergeEngine.cs @@ -28,10 +28,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines { } + public override long MergeResult() + { + + var result = base.Execute( queryable => ((IQueryable)queryable).LongCount()); + + return result.Sum(); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).LongCountAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).LongCountAsync(cancellationToken), cancellationToken); return result.Sum(); } diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs index bd5c3328..40251fb1 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/SingleAsyncInMemoryMergeEngine.cs @@ -26,9 +26,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines { } + public override TResult MergeResult() + { + var result = base.Execute( queryable => ((IQueryable)queryable).Single()); + var q = result.Where(o => o != null).AsQueryable(); + + var streamMergeContext = GetStreamMergeContext(); + if (streamMergeContext.Orders.Any()) + return q.OrderWithExpression(streamMergeContext.Orders).Single(); + + return q.Single(); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SingleAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).SingleAsync(cancellationToken), cancellationToken); var q = result.Where(o => o != null).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs index ba7aba86..88c19251 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/SingleOrDefaultAsyncInMemoryMergeEngine.cs @@ -26,9 +26,21 @@ namespace ShardingCore.Sharding.StreamMergeEngines { } + public override TResult MergeResult() + { + var result = base.Execute(queryable => ((IQueryable)queryable).SingleOrDefault()); + var q = result.Where(o => o != null).AsQueryable(); + + var streamMergeContext = GetStreamMergeContext(); + if (streamMergeContext.Orders.Any()) + return q.OrderWithExpression(streamMergeContext.Orders).SingleOrDefault(); + + return q.SingleOrDefault(); + } + public override async Task MergeResultAsync(CancellationToken cancellationToken = new CancellationToken()) { - var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SingleOrDefaultAsync(cancellationToken), cancellationToken); + var result = await base.ExecuteAsync( queryable => ((IQueryable)queryable).SingleOrDefaultAsync(cancellationToken), cancellationToken); var q = result.Where(o => o != null).AsQueryable(); var streamMergeContext = GetStreamMergeContext(); diff --git a/src/ShardingCore/ShardingCore.csproj b/src/ShardingCore/ShardingCore.csproj index 5fc22737..c5b47284 100644 --- a/src/ShardingCore/ShardingCore.csproj +++ b/src/ShardingCore/ShardingCore.csproj @@ -18,8 +18,4 @@ - - - -