2021-08-17 22:17:18 +08:00
|
|
|
|
using System;
|
2021-01-26 10:18:49 +08:00
|
|
|
|
using System.Collections.Generic;
|
|
|
|
|
using System.Linq;
|
|
|
|
|
using System.Linq.Expressions;
|
|
|
|
|
using System.Reflection;
|
|
|
|
|
using System.Threading;
|
|
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
using Microsoft.EntityFrameworkCore;
|
|
|
|
|
using Microsoft.EntityFrameworkCore.Diagnostics;
|
2021-08-17 22:17:18 +08:00
|
|
|
|
using Microsoft.EntityFrameworkCore.Infrastructure;
|
2021-01-26 10:18:49 +08:00
|
|
|
|
using Microsoft.EntityFrameworkCore.Internal;
|
2021-08-17 22:17:18 +08:00
|
|
|
|
using Microsoft.EntityFrameworkCore.Metadata;
|
2021-01-26 10:18:49 +08:00
|
|
|
|
using Microsoft.EntityFrameworkCore.Query;
|
|
|
|
|
using Microsoft.EntityFrameworkCore.Query.Internal;
|
|
|
|
|
using Microsoft.EntityFrameworkCore.Storage;
|
|
|
|
|
using Microsoft.Extensions.DependencyInjection;
|
2021-08-17 22:17:18 +08:00
|
|
|
|
using ShardingCore.Core.ShardingAccessors;
|
|
|
|
|
using ShardingCore.Exceptions;
|
|
|
|
|
using ShardingCore.Extensions;
|
|
|
|
|
using ShardingCore.Sharding;
|
|
|
|
|
using ShardingCore.Sharding.Abstractions;
|
|
|
|
|
using ShardingCore.Sharding.Enumerators;
|
|
|
|
|
using ShardingCore.Sharding.StreamMergeEngines;
|
2021-08-18 14:09:56 +08:00
|
|
|
|
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
|
2021-08-18 21:47:26 +08:00
|
|
|
|
using ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines;
|
2021-01-26 10:18:49 +08:00
|
|
|
|
|
|
|
|
|
namespace ShardingCore.EFCores
|
|
|
|
|
{
|
|
|
|
|
/**
|
|
|
|
|
* 描述:
|
|
|
|
|
*
|
|
|
|
|
* Author:xuejiaming
|
|
|
|
|
* Created: 2020/12/28 13:58:46
|
|
|
|
|
**/
|
2021-08-18 21:47:26 +08:00
|
|
|
|
public class ShardingQueryCompiler : IQueryCompiler
|
2021-08-17 22:17:18 +08:00
|
|
|
|
{
|
|
|
|
|
private readonly IQueryContextFactory _queryContextFactory;
|
2021-08-18 21:47:26 +08:00
|
|
|
|
private readonly IDatabase _database;
|
|
|
|
|
private readonly IDiagnosticsLogger<DbLoggerCategory.Query> _logger;
|
2021-08-17 22:17:18 +08:00
|
|
|
|
private readonly ICurrentDbContext _currentContext;
|
|
|
|
|
private readonly IModel _model;
|
|
|
|
|
private readonly IStreamMergeContextFactory _streamMergeContextFactory;
|
2021-01-26 10:18:49 +08:00
|
|
|
|
|
2021-08-17 22:17:18 +08:00
|
|
|
|
public ShardingQueryCompiler(IQueryContextFactory queryContextFactory, ICompiledQueryCache compiledQueryCache, ICompiledQueryCacheKeyGenerator compiledQueryCacheKeyGenerator, IDatabase database, IDiagnosticsLogger<DbLoggerCategory.Query> logger, ICurrentDbContext currentContext, IEvaluatableExpressionFilter evaluatableExpressionFilter, IModel model)
|
2021-08-18 21:47:26 +08:00
|
|
|
|
{
|
|
|
|
|
_queryContextFactory = queryContextFactory;
|
|
|
|
|
_database = database;
|
|
|
|
|
_logger = logger;
|
2021-08-17 22:17:18 +08:00
|
|
|
|
_currentContext = currentContext;
|
|
|
|
|
_model = model;
|
|
|
|
|
_streamMergeContextFactory = ShardingContainer.GetService<IStreamMergeContextFactory>();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private ICurrentDbContext GetCurrentDbContext()
|
|
|
|
|
{
|
|
|
|
|
return _currentContext;
|
|
|
|
|
}
|
|
|
|
|
public TResult Execute<TResult>(Expression query)
|
|
|
|
|
{
|
|
|
|
|
throw new NotImplementedException();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public TResult ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
|
|
|
|
|
{
|
|
|
|
|
var currentDbContext = GetCurrentDbContext().Context;
|
|
|
|
|
|
|
|
|
|
if (currentDbContext is IShardingDbContext shardingDbContext)
|
2021-03-22 14:43:36 +08:00
|
|
|
|
{
|
2021-08-17 22:17:18 +08:00
|
|
|
|
if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>)))
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
var queryEntityType = typeof(TResult).GetGenericArguments()[0];
|
|
|
|
|
Type type = typeof(EnumerableQuery<>);
|
|
|
|
|
type = type.MakeGenericType(queryEntityType);
|
|
|
|
|
var queryable = Activator.CreateInstance(type, query);
|
|
|
|
|
|
|
|
|
|
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
|
|
|
|
|
if (streamMergeContextMethod == null)
|
|
|
|
|
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
|
2021-08-18 21:47:26 +08:00
|
|
|
|
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable, shardingDbContext });
|
2021-08-17 22:17:18 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>);
|
|
|
|
|
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
|
2021-08-18 21:47:26 +08:00
|
|
|
|
return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
|
2021-08-17 22:17:18 +08:00
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (typeof(TResult).HasImplementedRawGeneric(typeof(Task<>)))
|
|
|
|
|
{
|
|
|
|
|
if (query is MethodCallExpression methodCallExpression)
|
|
|
|
|
{
|
|
|
|
|
switch (methodCallExpression.Method.Name)
|
|
|
|
|
{
|
2021-08-18 21:47:26 +08:00
|
|
|
|
|
|
|
|
|
case nameof(Enumerable.First):
|
|
|
|
|
return GenericMergeExecuteAsync<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.FirstOrDefault):
|
|
|
|
|
return GenericMergeExecuteAsync<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.Last):
|
|
|
|
|
return GenericMergeExecuteAsync<TResult>(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.LastOrDefault):
|
|
|
|
|
return GenericMergeExecuteAsync<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.Single):
|
|
|
|
|
return GenericMergeExecuteAsync<TResult>(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.SingleOrDefault):
|
|
|
|
|
return GenericMergeExecuteAsync<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.Count):
|
|
|
|
|
return EnsureMergeExecuteAsync<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.LongCount):
|
|
|
|
|
return EnsureMergeExecuteAsync<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.Any):
|
|
|
|
|
return EnsureMergeExecuteAsync<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.All):
|
|
|
|
|
return EnsureMergeExecuteAsync<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.Max):
|
|
|
|
|
return GenericMergeExecuteAsync<TResult>(typeof(MaxAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.Min):
|
|
|
|
|
return EnsureMergeExecuteAsync<TResult>(typeof(MinAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, cancellationToken);
|
|
|
|
|
case nameof(Enumerable.Sum):
|
|
|
|
|
return EnsureMergeExecuteAsync2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, cancellationToken);
|
2021-08-18 22:30:21 +08:00
|
|
|
|
case nameof(Enumerable.Average):
|
|
|
|
|
return EnsureMergeExecuteAsync2<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, cancellationToken);
|
2021-08-17 22:17:18 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
2021-08-18 22:30:21 +08:00
|
|
|
|
|
2021-08-17 22:17:18 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2021-08-18 22:30:21 +08:00
|
|
|
|
throw new ShardingCoreException($"db context operator not support query expression:[{query.Print()}] result type:[{typeof(TResult).FullName}]");
|
2021-08-17 22:17:18 +08:00
|
|
|
|
//IQueryable<TResult> queryable = new EnumerableQuery<TResult>(expression);
|
|
|
|
|
//var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext);
|
|
|
|
|
|
|
|
|
|
//var streamMergeEngine = AsyncEnumerableStreamMergeEngine<TResult>.Create<TResult>(streamMergeContext);
|
|
|
|
|
//return streamMergeEngine.GetAsyncEnumerator();
|
2021-03-22 14:43:36 +08:00
|
|
|
|
}
|
|
|
|
|
|
2021-08-17 22:17:18 +08:00
|
|
|
|
throw new ShardingCoreException("db context operator is not IShardingDbContext");
|
|
|
|
|
}
|
2021-08-18 21:47:26 +08:00
|
|
|
|
|
2021-08-17 22:17:18 +08:00
|
|
|
|
|
|
|
|
|
|
2021-08-18 14:09:56 +08:00
|
|
|
|
private TResult GenericMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
|
2021-08-17 23:05:36 +08:00
|
|
|
|
{
|
|
|
|
|
|
2021-08-18 14:09:56 +08:00
|
|
|
|
//Type type = typeof(EnumerableQuery<>);
|
|
|
|
|
//type = type.MakeGenericType(queryEntityType);
|
|
|
|
|
//var queryable = Activator.CreateInstance(type, query);
|
2021-08-17 23:05:36 +08:00
|
|
|
|
|
2021-08-18 14:09:56 +08:00
|
|
|
|
//var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
|
|
|
|
|
//if (streamMergeContextMethod == null)
|
|
|
|
|
// throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
|
|
|
|
|
//var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
|
|
|
|
|
var queryEntityType = query.GetQueryEntityType();
|
2021-08-17 23:05:36 +08:00
|
|
|
|
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
|
2021-08-18 14:09:56 +08:00
|
|
|
|
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
|
|
|
|
|
var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IGenericAsyncMergeResult.MergeResultAsync));
|
2021-08-17 23:05:36 +08:00
|
|
|
|
if (streamEngineMethod == null)
|
|
|
|
|
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
|
2021-08-18 14:09:56 +08:00
|
|
|
|
return (TResult)streamEngineMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamEngine, new object[] { cancellationToken });
|
2021-08-17 23:05:36 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2021-08-18 14:09:56 +08:00
|
|
|
|
private TResult EnsureMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
|
2021-08-17 22:17:18 +08:00
|
|
|
|
{
|
|
|
|
|
|
2021-08-18 14:09:56 +08:00
|
|
|
|
//Type type = typeof(EnumerableQuery<>);
|
|
|
|
|
//type = type.MakeGenericType(queryEntityType);
|
|
|
|
|
//var queryable = Activator.CreateInstance(type, query);
|
2021-08-17 22:17:18 +08:00
|
|
|
|
|
2021-08-18 14:09:56 +08:00
|
|
|
|
//var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
|
|
|
|
|
//if (streamMergeContextMethod == null)
|
|
|
|
|
// throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
|
|
|
|
|
//var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
|
2021-08-17 22:17:18 +08:00
|
|
|
|
|
2021-08-18 21:47:26 +08:00
|
|
|
|
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType());
|
|
|
|
|
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
|
|
|
|
|
var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureAsyncMergeResult<object>.MergeResultAsync));
|
|
|
|
|
if (streamEngineMethod == null)
|
|
|
|
|
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
|
|
|
|
|
return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private TResult EnsureMergeExecuteAsync2<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
//Type type = typeof(EnumerableQuery<>);
|
|
|
|
|
//type = type.MakeGenericType(queryEntityType);
|
|
|
|
|
//var queryable = Activator.CreateInstance(type, query);
|
|
|
|
|
|
|
|
|
|
//var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
|
|
|
|
|
//if (streamMergeContextMethod == null)
|
|
|
|
|
// throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
|
|
|
|
|
//var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
|
|
|
|
|
|
|
|
|
|
streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType(), typeof(TResult).GetGenericArguments()[0]);
|
2021-08-18 14:09:56 +08:00
|
|
|
|
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
|
|
|
|
|
var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureAsyncMergeResult<object>.MergeResultAsync));
|
2021-08-17 22:17:18 +08:00
|
|
|
|
if (streamEngineMethod == null)
|
2021-08-17 23:05:36 +08:00
|
|
|
|
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
|
|
|
|
|
return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken });
|
2021-08-17 22:17:18 +08:00
|
|
|
|
}
|
2021-08-18 14:09:56 +08:00
|
|
|
|
//private TResult EnsureMergeExecuteAsync<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, CancellationToken cancellationToken)
|
|
|
|
|
//{
|
|
|
|
|
|
|
|
|
|
// //Type type = typeof(EnumerableQuery<>);
|
|
|
|
|
// //type = type.MakeGenericType(queryEntityType);
|
|
|
|
|
// //var queryable = Activator.CreateInstance(type, query);
|
|
|
|
|
|
|
|
|
|
// //var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
|
|
|
|
|
// //if (streamMergeContextMethod == null)
|
|
|
|
|
// // throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
|
|
|
|
|
// //var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
|
|
|
|
|
|
|
|
|
|
// streamMergeEngineType = streamMergeEngineType.MakeGenericType(query.GetQueryEntityType());
|
|
|
|
|
// var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
|
|
|
|
|
// var streamEngineMethod = streamMergeEngineType.GetMethod(nameof(IEnsureAsyncMergeResult<object>.MergeResultAsync));
|
|
|
|
|
// if (streamEngineMethod == null)
|
|
|
|
|
// throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [DoExecuteAsync]");
|
|
|
|
|
// return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { cancellationToken });
|
|
|
|
|
//}
|
2021-08-17 22:17:18 +08:00
|
|
|
|
|
|
|
|
|
public Func<QueryContext, TResult> CreateCompiledQuery<TResult>(Expression query)
|
|
|
|
|
{
|
|
|
|
|
throw new NotImplementedException();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Func<QueryContext, TResult> CreateCompiledAsyncQuery<TResult>(Expression query)
|
|
|
|
|
{
|
|
|
|
|
throw new NotImplementedException();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|