diff --git a/src/ShardingCore/EFCores/ShardingQueryCompiler.cs b/src/ShardingCore/EFCores/ShardingQueryCompiler.cs index 2709205b..43fecbd3 100644 --- a/src/ShardingCore/EFCores/ShardingQueryCompiler.cs +++ b/src/ShardingCore/EFCores/ShardingQueryCompiler.cs @@ -120,14 +120,15 @@ namespace ShardingCore.EFCores return EnsureMergeExecuteAsync(typeof(MinAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken); case nameof(Enumerable.Sum): return EnsureMergeExecuteAsync2(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, cancellationToken); - + case nameof(Enumerable.Average): + return EnsureMergeExecuteAsync2(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, cancellationToken); } } - return default; + } - throw new ShardingCoreException($"db context operator not support query expression:[{query}] result type:[{typeof(TResult).FullName}]"); + throw new ShardingCoreException($"db context operator not support query expression:[{query.Print()}] result type:[{typeof(TResult).FullName}]"); //IQueryable queryable = new EnumerableQuery(expression); //var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext); diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs new file mode 100644 index 00000000..2dd665ff --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/AverageAsyncInMemoryMergeEngine.cs @@ -0,0 +1,155 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using ShardingCore.Exceptions; +using ShardingCore.Extensions; +using ShardingCore.Sharding.Abstractions; +using ShardingCore.Sharding.StreamMergeEngines.Abstractions; + +namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/8/18 22:15:04 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + public class AverageAsyncInMemoryMergeEngine : + AbstractEnsureMethodCallSelectorInMemoryAsyncMergeEngine + { + public AverageAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, + IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) + { + } + + public override async Task MergeResultAsync( + CancellationToken cancellationToken = new CancellationToken()) + { + if (typeof(decimal) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync( + async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + cancellationToken); + if (result.IsEmpty()) + return default; + var average = result.Sum()/result.Count; + return ConvertSum(average); + } + + if (typeof(decimal?) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync( + async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + cancellationToken); + if (result.IsEmpty()) + return default; + var average = result.Sum().HasValue ?result.Sum()/result.Count: default; + return ConvertSum(average); + } + + if (typeof(int) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync( + async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + cancellationToken); + if (result.IsEmpty()) + return default; + var average = result.Sum()/result.Count; + return ConvertSum(average); + } + + if (typeof(int?) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync( + async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + cancellationToken); + if (result.IsEmpty()) + return default; + var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + return ConvertSum(average); + } + + if (typeof(long) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync( + async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + cancellationToken); + if (result.IsEmpty()) + return default; + var average = result.Sum()/result.Count; + return ConvertSum(average); + } + + if (typeof(long?) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync( + async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + cancellationToken); + if (result.IsEmpty()) + return default; + var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + return ConvertSum(average); + } + + if (typeof(double) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync( + async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + cancellationToken); + var average = result.Sum()/result.Count; + return ConvertSum(average); + } + + if (typeof(double?) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync( + async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + cancellationToken); + if (result.IsEmpty()) + return default; + var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + return ConvertSum(average); + } + + if (typeof(float) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync( + async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + cancellationToken); + if (result.IsEmpty()) + return default; + var average = result.Sum()/result.Count; + return ConvertSum(average); + } + + if (typeof(float?) == typeof(TEnsureResult)) + { + var result = await base.ExecuteAsync( + async queryable => await ((IQueryable) queryable).AverageAsync(cancellationToken), + cancellationToken); + if (result.IsEmpty()) + return default; + var average = result.Sum().HasValue ? result.Sum() / result.Count : default; + return ConvertSum(average); + } + + throw new ShardingCoreException( + $"not support {GetMethodCallExpression().Print()} result {typeof(TEnsureResult)}"); + } + + private TEnsureResult ConvertSum(TNumber number) + { + if (number == null) + return default; + var convertExpr = Expression.Convert(Expression.Constant(number), typeof(TEnsureResult)); + return Expression.Lambda>(convertExpr).Compile()(); + } + } +} \ No newline at end of file diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/ContainsAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/ContainsAsyncInMemoryMergeEngine.cs new file mode 100644 index 00000000..2ac4a9da --- /dev/null +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/ContainsAsyncInMemoryMergeEngine.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines +{ + /* + * @Author: xjm + * @Description: + * @Date: 2021/8/18 22:30:07 + * @Ver: 1.0 + * @Email: 326308290@qq.com + */ + class ContainsAsyncInMemoryMergeEngine + { + } +} diff --git a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs index 454e9022..d13f9e91 100644 --- a/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs +++ b/src/ShardingCore/Sharding/StreamMergeEngines/AggregateMergeEngines/SumAsyncInMemoryMergeEngine.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using ShardingCore.Exceptions; +using ShardingCore.Extensions; using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.StreamMergeEngines.Abstractions; @@ -31,60 +32,80 @@ namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines if(typeof(decimal)==typeof(TEnsureResult)) { var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; var sum = result.Sum(); return ConvertSum(sum); } if (typeof(decimal?) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; var sum = result.Sum(); return ConvertSum(sum); } if (typeof(int) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; var sum = result.Sum(); return ConvertSum(sum); } if (typeof(int?) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; var sum = result.Sum(); return ConvertSum(sum); } if (typeof(long) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; var sum = result.Sum(); return ConvertSum(sum); } if (typeof(long?) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; var sum = result.Sum(); return ConvertSum(sum); } if (typeof(double) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; var sum = result.Sum(); return ConvertSum(sum); } if (typeof(double?) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; var sum = result.Sum(); return ConvertSum(sum); } if (typeof(float) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; var sum = result.Sum(); return ConvertSum(sum); } if (typeof(float?) == typeof(TEnsureResult)) { var result = await base.ExecuteAsync(async queryable => await ((IQueryable)queryable).SumAsync(cancellationToken), cancellationToken); + if (result.IsEmpty()) + return default; var sum = result.Sum(); return ConvertSum(sum); }