ExecuteUpdate ExecuteDelete完善
This commit is contained in:
parent
da8e43685d
commit
d3acc12e62
|
@ -0,0 +1,50 @@
|
|||
#if SHARDINGCORE7
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Query;
|
||||
using ShardingCore.Extensions.InternalExtensions;
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers;
|
||||
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions;
|
||||
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
|
||||
using ShardingCore.Sharding.StreamMergeEngines;
|
||||
|
||||
|
||||
|
||||
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
||||
{
|
||||
internal class ExecuteDeleteMethodExecutor<TEntity> : AbstractMethodWrapExecutor<int>
|
||||
{
|
||||
public ExecuteDeleteMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
|
||||
{
|
||||
}
|
||||
|
||||
public override ICircuitBreaker CreateCircuitBreaker()
|
||||
{
|
||||
var circuitBreaker = new NoTripCircuitBreaker(GetStreamMergeContext());
|
||||
circuitBreaker.Register(() =>
|
||||
{
|
||||
Cancel();
|
||||
});
|
||||
return circuitBreaker;
|
||||
}
|
||||
|
||||
public override IShardingMerger<RouteQueryResult<int>> GetShardingMerger()
|
||||
{
|
||||
return new CountMethodShardingMerger(GetStreamMergeContext());
|
||||
}
|
||||
|
||||
protected override Task<int> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return queryable.As<IQueryable<TEntity>>().ExecuteDeleteAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -0,0 +1,55 @@
|
|||
#if SHARDINGCORE7
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Query;
|
||||
using ShardingCore.Extensions.InternalExtensions;
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions;
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.ShardingMergers;
|
||||
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions;
|
||||
using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
|
||||
using ShardingCore.Sharding.StreamMergeEngines;
|
||||
|
||||
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
||||
{
|
||||
internal class ExecuteUpdateMethodExecutor<TEntity> : AbstractMethodWrapExecutor<int>
|
||||
{
|
||||
public ExecuteUpdateMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
|
||||
{
|
||||
}
|
||||
|
||||
public override ICircuitBreaker CreateCircuitBreaker()
|
||||
{
|
||||
var circuitBreaker = new NoTripCircuitBreaker(GetStreamMergeContext());
|
||||
circuitBreaker.Register(() =>
|
||||
{
|
||||
Cancel();
|
||||
});
|
||||
return circuitBreaker;
|
||||
}
|
||||
|
||||
public override IShardingMerger<RouteQueryResult<int>> GetShardingMerger()
|
||||
{
|
||||
return new CountMethodShardingMerger(GetStreamMergeContext());
|
||||
}
|
||||
|
||||
protected override Task<int> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
var executeUpdateCombineResult = (ExecuteUpdateCombineResult)GetStreamMergeContext().MergeQueryCompilerContext.GetQueryCombineResult();
|
||||
Expression<Func<SetPropertyCalls<TEntity>, SetPropertyCalls<TEntity>>>? setPropertyCallExpression = x=>x;
|
||||
var setPropertyCalls = executeUpdateCombineResult.GetSetPropertyCalls();
|
||||
if (setPropertyCalls != null)
|
||||
{
|
||||
setPropertyCallExpression = (Expression<Func<SetPropertyCalls<TEntity>, SetPropertyCalls<TEntity>>>)setPropertyCalls;
|
||||
}
|
||||
return queryable.As<IQueryable<TEntity>>().ExecuteUpdateAsync(setPropertyCallExpression, cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -19,28 +19,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
|
|||
public CountAsyncInMemoryMergeEngine(StreamMergeContext streamStreamMergeContext) : base(streamStreamMergeContext)
|
||||
{
|
||||
}
|
||||
//
|
||||
// protected override int DoMergeResult(List<RouteQueryResult<int>> resultList)
|
||||
// {
|
||||
//
|
||||
// if (_shardingPageManager.Current != null)
|
||||
// {
|
||||
// int r = 0;
|
||||
// foreach (var routeQueryResult in resultList)
|
||||
// {
|
||||
// _shardingPageManager.Current.RouteQueryResults.Add(new RouteQueryResult<long>(routeQueryResult.DataSourceName, routeQueryResult.TableRouteResult, routeQueryResult.QueryResult));
|
||||
// r += routeQueryResult.QueryResult;
|
||||
// }
|
||||
//
|
||||
// return r;
|
||||
// }
|
||||
// return resultList.Sum(o => o.QueryResult);
|
||||
// }
|
||||
//
|
||||
// protected override IExecutor<RouteQueryResult<int>> CreateExecutor0(bool async)
|
||||
// {
|
||||
// return new CountMethodExecutor<TEntity>(GetStreamMergeContext());
|
||||
// }
|
||||
protected override IExecutor<RouteQueryResult<int>> CreateExecutor()
|
||||
{
|
||||
return new CountMethodExecutor<TEntity>(GetStreamMergeContext());
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
|
||||
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.InMemoryMerge;
|
||||
using ShardingCore.Sharding.StreamMergeEngines;
|
||||
|
||||
namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
|
||||
{
|
||||
internal class ExecuteDeleteAsyncMemoryMergeEngine<TEntity> : AbstractMethodEnsureWrapMergeEngine<int>
|
||||
{
|
||||
public ExecuteDeleteAsyncMemoryMergeEngine(StreamMergeContext streamStreamMergeContext) : base(streamStreamMergeContext)
|
||||
{
|
||||
}
|
||||
protected override IExecutor<RouteQueryResult<int>> CreateExecutor()
|
||||
{
|
||||
return new ExecuteDeleteMethodExecutor<TEntity>(GetStreamMergeContext());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
#if SHARDINGCORE7
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.Methods;
|
||||
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.InMemoryMerge;
|
||||
using ShardingCore.Sharding.StreamMergeEngines;
|
||||
|
||||
namespace ShardingCore.Sharding.MergeEngines.ShardingMergeEngines
|
||||
{
|
||||
|
||||
internal class ExecuteUpdateAsyncMemoryMergeEngine<TEntity> : AbstractMethodEnsureWrapMergeEngine<int>
|
||||
{
|
||||
public ExecuteUpdateAsyncMemoryMergeEngine(StreamMergeContext streamStreamMergeContext) : base(streamStreamMergeContext)
|
||||
{
|
||||
}
|
||||
protected override IExecutor<RouteQueryResult<int>> CreateExecutor()
|
||||
{
|
||||
return new ExecuteUpdateMethodExecutor<TEntity>(GetStreamMergeContext());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -9,10 +9,10 @@ using System.Linq;
|
|||
using System.Linq.Expressions;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ShardingCore.Core;
|
||||
using ShardingCore.Extensions.InternalExtensions;
|
||||
|
||||
using ShardingCore.Sharding.MergeEngines;
|
||||
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines;
|
||||
using ShardingCore.Sharding.MergeEngines.ShardingMergeEngines.Abstractions.InMemoryMerge;
|
||||
|
@ -38,6 +38,7 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
|
|||
{
|
||||
_streamMergeContextFactory = streamMergeContextFactory;
|
||||
}
|
||||
|
||||
public TResult Execute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext)
|
||||
{
|
||||
//如果根表达式为tolist toarray getenumerator等表示需要迭代
|
||||
|
@ -50,7 +51,8 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
|
|||
}
|
||||
|
||||
|
||||
public TResult ExecuteAsync<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext, CancellationToken cancellationToken = new CancellationToken())
|
||||
public TResult ExecuteAsync<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext,
|
||||
CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
if (mergeQueryCompilerContext.IsEnumerableQuery())
|
||||
{
|
||||
|
@ -63,55 +65,82 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
|
|||
}
|
||||
|
||||
|
||||
throw new ShardingCoreException($"db context operator not support query expression:[{mergeQueryCompilerContext.GetQueryExpression().ShardingPrint()}] result type:[{typeof(TResult).FullName}]");
|
||||
|
||||
throw new ShardingCoreException(
|
||||
$"db context operator not support query expression:[{mergeQueryCompilerContext.GetQueryExpression().ShardingPrint()}] result type:[{typeof(TResult).FullName}]");
|
||||
}
|
||||
private TResult DoExecute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken = new CancellationToken())
|
||||
|
||||
private TResult DoExecute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext, bool async,
|
||||
CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
var queryMethodName = mergeQueryCompilerContext.GetQueryMethodName();
|
||||
switch (queryMethodName)
|
||||
{
|
||||
case nameof(Enumerable.First):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(FirstSkipAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.FirstOrDefault):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(FirstOrDefaultSkipAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Last):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(LastSkipAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.LastOrDefault):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(LastOrDefaultSkipAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Single):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(SingleSkipAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.SingleOrDefault):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(SingleOrDefaultSkipAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Count):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.LongCount):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Any):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.All):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(AllAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Max):
|
||||
return EnsureResultTypeMergeExecute2<TResult>(typeof(MaxAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Min):
|
||||
return EnsureResultTypeMergeExecute2<TResult>(typeof(MinAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Sum):
|
||||
return EnsureResultTypeMergeExecute2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Average):
|
||||
return EnsureResultTypeMergeExecute3<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,,>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Contains):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
|
||||
}
|
||||
var queryMethodName = mergeQueryCompilerContext.GetQueryMethodName();
|
||||
switch (queryMethodName)
|
||||
{
|
||||
case nameof(Enumerable.First):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(FirstSkipAsyncInMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.FirstOrDefault):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(FirstOrDefaultSkipAsyncInMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Last):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(LastSkipAsyncInMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.LastOrDefault):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(LastOrDefaultSkipAsyncInMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Single):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(SingleSkipAsyncInMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.SingleOrDefault):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(SingleOrDefaultSkipAsyncInMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Count):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.LongCount):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(LongCountAsyncInMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Any):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(AnyAsyncInMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.All):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(AllAsyncInMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Max):
|
||||
return EnsureResultTypeMergeExecute2<TResult>(typeof(MaxAsyncInMemoryMergeEngine<,>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Min):
|
||||
return EnsureResultTypeMergeExecute2<TResult>(typeof(MinAsyncInMemoryMergeEngine<,>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Sum):
|
||||
return EnsureResultTypeMergeExecute2<TResult>(typeof(SumAsyncInMemoryMergeEngine<,>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Average):
|
||||
return EnsureResultTypeMergeExecute3<TResult>(typeof(AverageAsyncInMemoryMergeEngine<,,>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(Enumerable.Contains):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(ContainsAsyncInMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
|
||||
#if SHARDINGCORE7
|
||||
case nameof(RelationalQueryableExtensions.ExecuteUpdate):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(ExecuteUpdateAsyncMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
case nameof(RelationalQueryableExtensions.ExecuteDelete):
|
||||
return EnsureResultTypeMergeExecute<TResult>(typeof(ExecuteDeleteAsyncMemoryMergeEngine<>),
|
||||
mergeQueryCompilerContext, async, cancellationToken);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
throw new ShardingCoreException($"db context operator not support query expression:[{mergeQueryCompilerContext.GetQueryExpression().ShardingPrint()}] result type:[{typeof(TResult).FullName}]");
|
||||
throw new ShardingCoreException(
|
||||
$"db context operator not support query expression:[{mergeQueryCompilerContext.GetQueryExpression().ShardingPrint()}] result type:[{typeof(TResult).FullName}]");
|
||||
}
|
||||
|
||||
private StreamMergeContext GetStreamMergeContext(IMergeQueryCompilerContext mergeQueryCompilerContext)
|
||||
{
|
||||
return _streamMergeContextFactory.Create(mergeQueryCompilerContext);
|
||||
|
||||
}
|
||||
|
||||
private TResult EnumerableExecute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext)
|
||||
{
|
||||
var combineQueryable = mergeQueryCompilerContext.GetQueryCombineResult().GetCombineQueryable();
|
||||
|
@ -123,43 +152,53 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
|
|||
return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
|
||||
}
|
||||
|
||||
private TResult EnsureResultTypeMergeExecute<TResult>(Type streamMergeEngineType, IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken)
|
||||
private TResult EnsureResultTypeMergeExecute<TResult>(Type streamMergeEngineType,
|
||||
IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken)
|
||||
{
|
||||
var combineQueryable = mergeQueryCompilerContext.GetQueryCombineResult().GetCombineQueryable();
|
||||
var queryEntityType = combineQueryable.ElementType;
|
||||
var newStreamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
|
||||
var streamMergeContext = GetStreamMergeContext(mergeQueryCompilerContext);
|
||||
var streamEngine = Activator.CreateInstance(newStreamMergeEngineType, streamMergeContext);
|
||||
var methodName = async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult);
|
||||
var methodName = async
|
||||
? nameof(IEnsureMergeResult<object>.MergeResultAsync)
|
||||
: nameof(IEnsureMergeResult<object>.MergeResult);
|
||||
var streamEngineMethod = newStreamMergeEngineType.GetMethod(methodName);
|
||||
if (streamEngineMethod == null)
|
||||
throw new ShardingCoreException($"cant found InMemoryAsyncStreamMergeEngine method [{methodName}]");
|
||||
var @params = async ? new object[] { cancellationToken } : new object[0];
|
||||
return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
|
||||
}
|
||||
private TResult EnsureResultTypeMergeExecute2<TResult>(Type streamMergeEngineType, IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken)
|
||||
{
|
||||
|
||||
private TResult EnsureResultTypeMergeExecute2<TResult>(Type streamMergeEngineType,
|
||||
IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken)
|
||||
{
|
||||
var streamMergeContext = GetStreamMergeContext(mergeQueryCompilerContext);
|
||||
var resultType = (mergeQueryCompilerContext.GetQueryExpression() as MethodCallExpression).GetResultType();
|
||||
streamMergeEngineType = streamMergeEngineType.MakeGenericType(resultType, resultType);
|
||||
var streamEngine = Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
|
||||
var methodName = async ? nameof(IEnsureMergeResult<object>.MergeResultAsync) : nameof(IEnsureMergeResult<object>.MergeResult);
|
||||
var methodName = async
|
||||
? nameof(IEnsureMergeResult<object>.MergeResultAsync)
|
||||
: nameof(IEnsureMergeResult<object>.MergeResult);
|
||||
var streamEngineMethod = streamMergeEngineType.GetMethod(methodName);
|
||||
if (streamEngineMethod == null)
|
||||
throw new ShardingCoreException($"cant found InMemoryAsyncStreamMergeEngine method [{methodName}]");
|
||||
var @params = async ? new object[] { cancellationToken } : new object[0];
|
||||
return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
|
||||
}
|
||||
private TResult EnsureResultTypeMergeExecute3<TResult>(Type streamMergeEngineType, IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken)
|
||||
|
||||
private TResult EnsureResultTypeMergeExecute3<TResult>(Type streamMergeEngineType,
|
||||
IMergeQueryCompilerContext mergeQueryCompilerContext, bool async, CancellationToken cancellationToken)
|
||||
{
|
||||
var combineQueryable = mergeQueryCompilerContext.GetQueryCombineResult().GetCombineQueryable();
|
||||
var queryEntityType = combineQueryable.ElementType;
|
||||
var resultType = (mergeQueryCompilerContext.GetQueryExpression() as MethodCallExpression).GetResultType();
|
||||
if (async)
|
||||
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType, typeof(TResult).GetGenericArguments()[0], resultType);
|
||||
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType,
|
||||
typeof(TResult).GetGenericArguments()[0], resultType);
|
||||
else
|
||||
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType, typeof(TResult), resultType);
|
||||
streamMergeEngineType =
|
||||
streamMergeEngineType.MakeGenericType(queryEntityType, typeof(TResult), resultType);
|
||||
var streamMergeContext = GetStreamMergeContext(mergeQueryCompilerContext);
|
||||
var streamEngine = Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
|
||||
var methodName = async
|
||||
|
@ -233,4 +272,4 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
|
|||
// return (TResult)streamEngineMethod.Invoke(streamEngine, @params);
|
||||
//}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -9,6 +9,7 @@ using ShardingCore.Sharding.ShardingExecutors.QueryableCombines;
|
|||
using System;
|
||||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ShardingCore.Core;
|
||||
using ShardingCore.Extensions.InternalExtensions;
|
||||
|
||||
|
@ -118,10 +119,12 @@ namespace ShardingCore.Sharding.ShardingExecutors
|
|||
case nameof(Queryable.LongCount):
|
||||
case nameof(Queryable.Any):
|
||||
return _whereQueryableCombine;
|
||||
case "ExecuteUpdate":
|
||||
#if SHARDINGCORE7
|
||||
case nameof(RelationalQueryableExtensions.ExecuteUpdate):
|
||||
return _executeUpdateQueryableCombine;
|
||||
case "ExecuteDelete":
|
||||
case nameof(RelationalQueryableExtensions.ExecuteDelete):
|
||||
return _executeDeleteQueryableCombine;
|
||||
#endif
|
||||
case nameof(Queryable.All):
|
||||
return _allQueryableCombine;
|
||||
case nameof(Queryable.Max):
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
|
||||
|
||||
namespace ShardingCore.Sharding.ShardingExecutors.QueryableCombines
|
||||
{
|
||||
public class ExecuteUpdateCombineResult: QueryCombineResult
|
||||
{
|
||||
private readonly LambdaExpression _setPropertyCalls;
|
||||
|
||||
public ExecuteUpdateCombineResult(LambdaExpression setPropertyCalls,IQueryable queryable,IQueryCompilerContext queryCompilerContext) : base(queryable, queryCompilerContext)
|
||||
{
|
||||
_setPropertyCalls = setPropertyCalls;
|
||||
}
|
||||
|
||||
public LambdaExpression GetSetPropertyCalls()
|
||||
{
|
||||
return _setPropertyCalls;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,15 +1,36 @@
|
|||
using System;
|
||||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using Microsoft.EntityFrameworkCore.Query;
|
||||
using ShardingCore.Exceptions;
|
||||
using ShardingCore.Extensions;
|
||||
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
|
||||
|
||||
namespace ShardingCore.Sharding.ShardingExecutors.QueryableCombines
|
||||
{
|
||||
public class ExecuteUpdateQueryableCombine: AbstractQueryableCombine
|
||||
public class ExecuteUpdateQueryableCombine:AbstractQueryableCombine
|
||||
{
|
||||
public override IQueryable DoCombineQueryable(IQueryable queryable, Expression secondExpression,
|
||||
IQueryCompilerContext queryCompilerContext)
|
||||
public override IQueryable DoCombineQueryable(IQueryable queryable, Expression secondExpression, IQueryCompilerContext queryCompilerContext)
|
||||
{
|
||||
if (!(secondExpression is ConstantExpression))
|
||||
{
|
||||
throw new ShardingCoreInvalidOperationException(queryCompilerContext.GetQueryExpression().ShardingPrint());
|
||||
}
|
||||
|
||||
return queryable;
|
||||
}
|
||||
|
||||
public override QueryCombineResult GetDefaultQueryCombineResult(IQueryable queryable, Expression secondExpression, IQueryCompilerContext queryCompilerContext)
|
||||
{
|
||||
|
||||
LambdaExpression setPropertyCalls = null;
|
||||
if (secondExpression is UnaryExpression where && where.Operand is LambdaExpression lambdaExpression)
|
||||
{
|
||||
setPropertyCalls = lambdaExpression;
|
||||
}
|
||||
|
||||
return new ExecuteUpdateCombineResult(setPropertyCalls,queryable, queryCompilerContext);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue