非常完美

This commit is contained in:
xuejiaming 2021-08-18 14:09:56 +08:00
parent 0558a4d8ce
commit b26bd41a6f
13 changed files with 395 additions and 63 deletions

View File

@ -26,8 +26,10 @@ namespace Sample.SqlServer.Controllers
[HttpGet] [HttpGet]
public async Task<IActionResult> Get() public async Task<IActionResult> Get()
{ {
var resultx2 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Age<=10).CountAsync(); var resultx111 = await _defaultTableDbContext.Set<SysUserMod>().FirstOrDefaultAsync(o => o.Id == "198");
var resultx2 = await _defaultTableDbContext.Set<SysUserMod>().CountAsync(o => o.Age<=10);
var resultx = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").FirstOrDefaultAsync(); var resultx = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").FirstOrDefaultAsync();
var resultx33 = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").Select(o=>o.Id).FirstOrDefaultAsync();
var result = await _defaultTableDbContext.Set<SysUserMod>().ToListAsync(); var result = await _defaultTableDbContext.Set<SysUserMod>().ToListAsync();
var sysUserMod98 = result.FirstOrDefault(o => o.Id == "98"); var sysUserMod98 = result.FirstOrDefault(o => o.Id == "98");

View File

@ -21,6 +21,7 @@ using ShardingCore.Sharding;
using ShardingCore.Sharding.Abstractions; using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators; using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines; using ShardingCore.Sharding.StreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
namespace ShardingCore.EFCores namespace ShardingCore.EFCores
{ {
@ -89,18 +90,19 @@ namespace ShardingCore.EFCores
{ {
if (query is MethodCallExpression methodCallExpression) if (query is MethodCallExpression methodCallExpression)
{ {
var rootQuery = methodCallExpression.Arguments.FirstOrDefault(o => typeof(IQueryable).IsAssignableFrom(o.Type));
if (rootQuery == null)
throw new ShardingCoreException("expression error");
var returnEntityType = query.Type;
var queryEntityType = rootQuery.Type.GetSequenceType();
switch (methodCallExpression.Method.Name) switch (methodCallExpression.Method.Name)
{ {
case nameof(Enumerable.FirstOrDefault): case nameof(Enumerable.FirstOrDefault):
return GenericMergeExecuteAsync<TResult>(typeof(FirstOrDefaultAsyncInMemoryAsyncStreamMergeEngine<>),shardingDbContext, returnEntityType, queryEntityType, rootQuery, cancellationToken); return GenericMergeExecuteAsync<TResult>(typeof(FirstOrDefaultAsyncInMemoryAsyncStreamMergeEngine<>),shardingDbContext, methodCallExpression, cancellationToken);
case nameof(Enumerable.Count): case nameof(Enumerable.Count):
return EnsureMergeExecuteAsync<TResult>(typeof(CountAsyncInMemoryAsyncStreamMergeEngine<>),shardingDbContext, queryEntityType, rootQuery, cancellationToken); return EnsureMergeExecuteAsync<TResult>(typeof(CountAsyncInMemoryAsyncStreamMergeEngine<>),shardingDbContext, methodCallExpression, cancellationToken);
case nameof(Enumerable.LongCount):
return EnsureMergeExecuteAsync<TResult>(typeof(LongCountAsyncInMemoryAsyncStreamMergeEngine<>),shardingDbContext, methodCallExpression, cancellationToken);
case nameof(Enumerable.Any):
return EnsureMergeExecuteAsync<TResult>(typeof(AnyAsyncInMemoryAsyncStreamMergeEngine<>),shardingDbContext, methodCallExpression, cancellationToken);
case nameof(Enumerable.All):
return EnsureMergeExecuteAsync<TResult>(typeof(AllAsyncInMemoryAsyncStreamMergeEngine<>),shardingDbContext, methodCallExpression, cancellationToken);
} }
} }
@ -121,46 +123,65 @@ namespace ShardingCore.EFCores
private TResult GenericMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext,Type returnEntityType, Type queryEntityType, Expression query, CancellationToken cancellationToken) private TResult GenericMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
{ {
Type type = typeof(EnumerableQuery<>); //Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType); //type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, query); //var queryable = Activator.CreateInstance(type, query);
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
//var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
//if (streamMergeContextMethod == null)
// throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
//var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
var queryEntityType = query.GetQueryEntityType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, streamMergeContext); var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod("DoExecuteAsync"); var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IGenericAsyncMergeResult.MergeResultAsync));
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, new object[] { cancellationToken });
}
private TResult EnsureMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
{
//Type type = typeof(EnumerableQuery<>);
//type = type.MakeGenericType(queryEntityType);
//var queryable = Activator.CreateInstance(type, query);
//var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
//if (streamMergeContextMethod == null)
// throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
//var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType());
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureAsyncMergeResult<object>.MergeResultAsync));
if (streamEngineMethod == null) if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]"); throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken }); return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken });
} }
//private TResult EnsureMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
//{
// //Type type = typeof(EnumerableQuery<>);
// //type = type.MakeGenericType(queryEntityType);
// //var queryable = Activator.CreateInstance(type, query);
private TResult EnsureMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, Type queryEntityType, Expression query, CancellationToken cancellationToken) // //var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
{ // //if (streamMergeContextMethod == null)
// // throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
// //var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
Type type = typeof(EnumerableQuery<>); // streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType());
type = type.MakeGenericType(queryEntityType); // var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var queryable = Activator.CreateInstance(type, query); // var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureAsyncMergeResult<object>.MergeResultAsync));
// if (streamEngineMethod == null)
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create"); // throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
if (streamMergeContextMethod == null) // return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken });
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]"); //}
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
var streamEngineMethod = streamMergeEngineType.GetMethod("DoExecuteAsync");
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken });
}
public Func<QueryContext, TResult> CreateCompiledQuery<TResult>(Expression query) public Func<QueryContext, TResult> CreateCompiledQuery<TResult>(Expression query)
{ {

View File

@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using ShardingCore.Exceptions;
namespace ShardingCore.Extensions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/18 12:54:33
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public static class ShardingComplierExtension
{
public static Type GetQueryEntityType(this MethodCallExpression expression)
{
var rootQuery = expression.Arguments.FirstOrDefault(o => typeof(IQueryable).IsAssignableFrom(o.Type));
if (rootQuery == null)
throw new ShardingCoreException("expression error");
return rootQuery.Type.GetSequenceType();
}
}
}

View File

@ -0,0 +1,59 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/18 13:44:02
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractEnsureMethodCallInMemoryAsyncStreamMergeEngine<TEntity, TResult> : AbstractInMemoryAsyncStreamMergeEngine<TEntity>,IEnsureAsyncMergeResult<TResult>
{
private readonly StreamMergeContext<TEntity> _mergeContext;
private readonly IQueryable<TEntity> _queryable;
public AbstractEnsureMethodCallInMemoryAsyncStreamMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext)
{
var expression = methodCallExpression.Arguments.FirstOrDefault(o => typeof(IQueryable).IsAssignableFrom(o.Type)) ?? throw new InvalidOperationException(methodCallExpression.Print());
_queryable = new EnumerableQuery<TEntity>(expression);
var predicate = methodCallExpression.Arguments.FirstOrDefault(o => o is UnaryExpression);
if (predicate != null)
{
_queryable = _queryable.Where((Expression<Func<TEntity, bool>>)((UnaryExpression)predicate).Operand);
}
else
{
if (methodCallExpression.Arguments.Count == 2)
throw new InvalidOperationException(methodCallExpression.Print());
}
_mergeContext = ShardingContainer.GetService<IStreamMergeContextFactory>().Create(_queryable, shardingDbContext);
}
public abstract Task<TResult> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken());
protected override StreamMergeContext<TEntity> GetStreamMergeContext()
{
return _mergeContext;
}
protected IQueryable<TEntity> GetQueryable()
{
return _queryable;
}
}
}

View File

@ -0,0 +1,59 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/18 14:04:07
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractGenericMethodCallInMemoryAsyncStreamMergeEngine<TEntity> : AbstractInMemoryAsyncStreamMergeEngine<TEntity>, IGenericAsyncMergeResult
{
private readonly StreamMergeContext<TEntity> _mergeContext;
private readonly IQueryable<TEntity> _queryable;
public AbstractGenericMethodCallInMemoryAsyncStreamMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext)
{
var expression = methodCallExpression.Arguments.FirstOrDefault(o => typeof(IQueryable).IsAssignableFrom(o.Type)) ?? throw new InvalidOperationException(methodCallExpression.Print());
_queryable = new EnumerableQuery<TEntity>(expression);
var predicate = methodCallExpression.Arguments.FirstOrDefault(o => o is UnaryExpression);
if (predicate != null)
{
_queryable = _queryable.Where((Expression<Func<TEntity, bool>>)((UnaryExpression)predicate).Operand);
}
else
{
if (methodCallExpression.Arguments.Count == 2)
throw new InvalidOperationException(methodCallExpression.Print());
}
_mergeContext = ShardingContainer.GetService<IStreamMergeContextFactory>().Create(_queryable, shardingDbContext);
}
public abstract Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
protected override StreamMergeContext<TEntity> GetStreamMergeContext()
{
return _mergeContext;
}
protected IQueryable<TEntity> GetQueryable()
{
return _queryable;
}
}
}

View File

@ -21,16 +21,15 @@ namespace ShardingCore.Sharding.StreamMergeEngines
*/ */
public abstract class AbstractInMemoryAsyncStreamMergeEngine<T> public abstract class AbstractInMemoryAsyncStreamMergeEngine<T>
{ {
private readonly StreamMergeContext<T> _mergeContext; /// <summary>
/// 获取流失合并上下文
public AbstractInMemoryAsyncStreamMergeEngine(StreamMergeContext<T> mergeContext) /// </summary>
{ /// <returns></returns>
_mergeContext = mergeContext; protected abstract StreamMergeContext<T> GetStreamMergeContext();
}
public async Task<List<TResult>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery,CancellationToken cancellationToken = new CancellationToken()) public async Task<List<TResult>> ExecuteAsync<TResult>(Func<IQueryable, Task<TResult>> efQuery,CancellationToken cancellationToken = new CancellationToken())
{ {
var tableResult = _mergeContext.GetRouteResults(); var tableResult = GetStreamMergeContext().GetRouteResults();
var enumeratorTasks = tableResult.Select(routeResult => var enumeratorTasks = tableResult.Select(routeResult =>
{ {
if (routeResult.ReplaceTables.Count > 1) if (routeResult.ReplaceTables.Count > 1)
@ -48,8 +47,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines
//var shardingContext = ShardingContext.Create(routeResult); //var shardingContext = ShardingContext.Create(routeResult);
//scope.ShardingAccessor.ShardingContext = shardingContext; //scope.ShardingAccessor.ShardingContext = shardingContext;
var shardingDbContext = _mergeContext.CreateDbContext(tail); var shardingDbContext = GetStreamMergeContext().CreateDbContext(tail);
var newQueryable = (IQueryable<T>)_mergeContext.GetReWriteQueryable() var newQueryable = (IQueryable<T>)GetStreamMergeContext().GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext); .ReplaceDbContextQueryable(shardingDbContext);
var query = await efQuery(newQueryable); var query = await efQuery(newQueryable);
return query; return query;

View File

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/18 13:47:34
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IEnsureAsyncMergeResult<T>: IEnsureAsyncMergeResult
{
/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<T> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/18 13:47:57
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public interface IGenericAsyncMergeResult
{
/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.StreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/18 13:39:51
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class AllAsyncInMemoryAsyncStreamMergeEngine<TEntity> : AbstractEnsureMethodCallInMemoryAsyncStreamMergeEngine<TEntity, bool>
{
public AllAsyncInMemoryAsyncStreamMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TEntity>)queryable).AnyAsync(cancellationToken), cancellationToken);
return result.All(o => o);
}
}
}

View File

@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.StreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/18 13:37:00
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class AnyAsyncInMemoryAsyncStreamMergeEngine<TEntity> : AbstractEnsureMethodCallInMemoryAsyncStreamMergeEngine<TEntity,bool>
{
public AnyAsyncInMemoryAsyncStreamMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public override async Task<bool> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TEntity>)queryable).AnyAsync(cancellationToken), cancellationToken);
return result.Any(o=>o);
}
}
}

View File

@ -1,11 +1,16 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Linq.Expressions;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Extensions; using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.StreamMergeEngines namespace ShardingCore.Sharding.StreamMergeEngines
{ {
@ -16,17 +21,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class CountAsyncInMemoryAsyncStreamMergeEngine<TResult> : AbstractInMemoryAsyncStreamMergeEngine<TResult> public class CountAsyncInMemoryAsyncStreamMergeEngine<TEntity> : AbstractEnsureMethodCallInMemoryAsyncStreamMergeEngine<TEntity,int>
{ {
private readonly StreamMergeContext<TResult> _mergeContext; public CountAsyncInMemoryAsyncStreamMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
public CountAsyncInMemoryAsyncStreamMergeEngine(StreamMergeContext<TResult> mergeContext) : base(mergeContext)
{ {
_mergeContext = mergeContext;
} }
public async Task<int> DoExecuteAsync(CancellationToken cancellationToken = new CancellationToken()) public override async Task<int> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{ {
var result = await base.ExecuteAsync(async iqueryable => await EntityFrameworkQueryableExtensions.CountAsync((IQueryable<TResult>)iqueryable, cancellationToken), cancellationToken); var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TEntity>)queryable).CountAsync(cancellationToken), cancellationToken);
return result.Sum(); return result.Sum();
} }

View File

@ -1,11 +1,16 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Linq.Expressions;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Extensions; using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.StreamMergeEngines namespace ShardingCore.Sharding.StreamMergeEngines
{ {
@ -16,23 +21,22 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class FirstOrDefaultAsyncInMemoryAsyncStreamMergeEngine<TResult>:AbstractInMemoryAsyncStreamMergeEngine<TResult> public class FirstOrDefaultAsyncInMemoryAsyncStreamMergeEngine<TEntity> :AbstractGenericMethodCallInMemoryAsyncStreamMergeEngine<TEntity>
{ {
private readonly StreamMergeContext<TResult> _mergeContext; public FirstOrDefaultAsyncInMemoryAsyncStreamMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
public FirstOrDefaultAsyncInMemoryAsyncStreamMergeEngine(StreamMergeContext<TResult> mergeContext) : base(mergeContext)
{ {
_mergeContext = mergeContext;
} }
public async Task<TResult> DoExecuteAsync(CancellationToken cancellationToken = new CancellationToken())
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{ {
var result = await base.ExecuteAsync(async iqueryable=> await EntityFrameworkQueryableExtensions.FirstOrDefaultAsync((IQueryable<TResult>)iqueryable, cancellationToken), cancellationToken); var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TResult>)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null).AsQueryable(); var q = result.Where(o => o != null).AsQueryable();
if (_mergeContext.Orders.Any())
return q.OrderWithExpression(_mergeContext.Orders).FirstOrDefault(); var streamMergeContext = GetStreamMergeContext();
if (streamMergeContext.Orders.Any())
return q.OrderWithExpression(streamMergeContext.Orders).FirstOrDefault();
return q.FirstOrDefault(); return q.FirstOrDefault();
} }
} }
} }

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.StreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/18 6:34:00
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class LongCountAsyncInMemoryAsyncStreamMergeEngine<TEntity> : AbstractEnsureMethodCallInMemoryAsyncStreamMergeEngine<TEntity,long>
{
public LongCountAsyncInMemoryAsyncStreamMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
}
public override async Task<long> MergeResultAsync(CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(async queryable => await ((IQueryable<TEntity>)queryable).LongCountAsync(cancellationToken), cancellationToken);
return result.Sum();
}
}
}