#11 支持单个查询的追踪 first firstordefault last lastordefault single singleordefault....

This commit is contained in:
xuejiaming 2021-09-24 10:39:08 +08:00
parent a66a19d452
commit 017b3e0082
9 changed files with 121 additions and 27 deletions

View File

@ -79,17 +79,17 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
{ {
case nameof(Enumerable.First): case nameof(Enumerable.First):
return GenericMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.FirstOrDefault): case nameof(Enumerable.FirstOrDefault):
return GenericMergeExecute<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(FirstOrDefaultAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Last): case nameof(Enumerable.Last):
return GenericMergeExecute<TResult>(typeof(LastAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(LastAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LastOrDefault): case nameof(Enumerable.LastOrDefault):
return GenericMergeExecute<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(LastOrDefaultAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Single): case nameof(Enumerable.Single):
return GenericMergeExecute<TResult>(typeof(SingleAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(SingleAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.SingleOrDefault): case nameof(Enumerable.SingleOrDefault):
return GenericMergeExecute<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); return GenericShardingDbContextMergeExecute<TResult>(typeof(SingleOrDefaultAsyncInMemoryMergeEngine<,>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.Count): case nameof(Enumerable.Count):
return EnsureMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken); return EnsureMergeExecute<TResult>(typeof(CountAsyncInMemoryMergeEngine<>), shardingDbContext, methodCallExpression, async, cancellationToken);
case nameof(Enumerable.LongCount): case nameof(Enumerable.LongCount):
@ -144,11 +144,11 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
} }
private TResult GenericMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken) private TResult GenericShardingDbContextMergeExecute<TResult>(Type streamMergeEngineType, IShardingDbContext shardingDbContext, MethodCallExpression query, bool async, CancellationToken cancellationToken)
{ {
var queryEntityType = query.GetQueryEntityType(); var queryEntityType = query.GetQueryEntityType();
var resultEntityType = query.GetResultType(); var resultEntityType = query.GetResultType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType); streamMergeEngineType = streamMergeEngineType.MakeGenericType(shardingDbContext.GetType(),queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext); var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult); var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult);
var streamEngineMethod = streamMergeEngineType.GetMethod(methodName); var streamEngineMethod = streamMergeEngineType.GetMethod(methodName);

View File

@ -58,7 +58,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
_mergeContext = ((IStreamMergeContextFactory)ShardingContainer.GetService(typeof(IStreamMergeContextFactory<>).GetGenericType0(shardingDbContext.GetType()))).Create(_queryable, shardingDbContext); _mergeContext = ((IStreamMergeContextFactory)ShardingContainer.GetService(typeof(IStreamMergeContextFactory<>).GetGenericType0(shardingDbContext.GetType()))).Create(_queryable, shardingDbContext);
_parllelDbbContexts = new List<DbContext>(); _parllelDbbContexts = new LinkedList<DbContext>();
} }
/// <summary> /// <summary>
/// 合并queryable /// 合并queryable

View File

@ -0,0 +1,94 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/24 10:16:28
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext,TEntity> : AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity> where TShardingDbContext:DbContext,IShardingDbContext
{
private readonly ITrackerManager<TShardingDbContext> _trackerManager;
protected AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
_trackerManager = ShardingContainer.GetService<ITrackerManager<TShardingDbContext>>();
}
private bool IsUseTrack => GetIsUseTracker();
private bool GetIsUseTracker()
{
if (GetStreamMergeContext().IsNoTracking.HasValue)
{
return !GetStreamMergeContext().IsNoTracking.Value;
}
else
{
return ((DbContext)GetStreamMergeContext().GetShardingDbContext()).ChangeTracker.QueryTrackingBehavior ==
QueryTrackingBehavior.TrackAll;
}
}
public override TResult MergeResult<TResult>()
{
var current = DoMergeResult<TResult>();
if (current != null)
{
if (IsUseTrack && _trackerManager.EntityUseTrack(current.GetType()))
{
var c = (object)current;
var genericDbContext = GetStreamMergeContext().GetShardingDbContext().CreateGenericDbContext(c);
var attachedEntity = genericDbContext.GetAttachedEntity(c);
if (attachedEntity == null)
genericDbContext.Attach(current);
else
{
return (TResult)attachedEntity;
}
}
}
return current;
}
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var current = await DoMergeResultAsync<TResult>(cancellationToken);
if (current != null)
{
if (IsUseTrack && _trackerManager.EntityUseTrack(current.GetType()))
{
var c = (object)current;
var genericDbContext = GetStreamMergeContext().GetShardingDbContext().CreateGenericDbContext(c);
var attachedEntity = genericDbContext.GetAttachedEntity(c);
if (attachedEntity == null)
genericDbContext.Attach(current);
else
{
return (TResult)attachedEntity;
}
}
}
return current;
}
public abstract TResult DoMergeResult<TResult>();
public abstract Task<TResult> DoMergeResultAsync<TResult>(
CancellationToken cancellationToken = new CancellationToken());
}
}

View File

@ -21,18 +21,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class FirstAsyncInMemoryMergeEngine<TEntity>: AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity> public class FirstAsyncInMemoryMergeEngine<TShardingDbContext,TEntity>: AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext,TEntity> where TShardingDbContext:DbContext,IShardingDbContext
{ {
public FirstAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public FirstAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{ {
} }
public override TResult MergeResult<TResult>() public override TResult DoMergeResult<TResult>()
{ {
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>()); return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
} }
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken()) public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{ {
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).FirstAsync(cancellationToken), cancellationToken); var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).FirstAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();

View File

@ -24,18 +24,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class FirstOrDefaultAsyncInMemoryMergeEngine<TEntity> : AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity> public class FirstOrDefaultAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext
{ {
public FirstOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public FirstOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{ {
} }
public override TResult MergeResult<TResult>() public override TResult DoMergeResult<TResult>()
{ {
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>()); return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
} }
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken()) public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{ {
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken); var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).FirstOrDefaultAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();

View File

@ -21,18 +21,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class LastAsyncInMemoryMergeEngine<TEntity>: AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity> public class LastAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext
{ {
public LastAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public LastAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{ {
} }
public override TResult MergeResult<TResult>() public override TResult DoMergeResult<TResult>()
{ {
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>()); return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
} }
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken()) public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{ {
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).LastAsync(cancellationToken), cancellationToken); var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).LastAsync(cancellationToken), cancellationToken);

View File

@ -21,18 +21,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class LastOrDefaultAsyncInMemoryMergeEngine<TEntity>: AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity> public class LastOrDefaultAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext
{ {
public LastOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public LastOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{ {
} }
public override TResult MergeResult<TResult>() public override TResult DoMergeResult<TResult>()
{ {
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>()); return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
} }
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken()) public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{ {
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).LastOrDefaultAsync(cancellationToken), cancellationToken); var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).LastOrDefaultAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();

View File

@ -21,18 +21,18 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class SingleAsyncInMemoryMergeEngine<TEntity>: AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity> public class SingleAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext
{ {
public SingleAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public SingleAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{ {
} }
public override TResult MergeResult<TResult>() public override TResult DoMergeResult<TResult>()
{ {
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>()); return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
} }
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken()) public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{ {
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).SingleAsync(cancellationToken), cancellationToken); var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).SingleAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();

View File

@ -21,19 +21,19 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class SingleOrDefaultAsyncInMemoryMergeEngine<TEntity>: AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity> public class SingleOrDefaultAsyncInMemoryMergeEngine<TShardingDbContext,TEntity> : AbstractTrackGenericMethodCallWhereInMemoryAsyncMergeEngine<TShardingDbContext, TEntity> where TShardingDbContext : DbContext, IShardingDbContext
{ {
public SingleOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext) public SingleOrDefaultAsyncInMemoryMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{ {
} }
public override TResult MergeResult<TResult>() public override TResult DoMergeResult<TResult>()
{ {
return AsyncHelper.RunSync(() => MergeResultAsync<TResult>()); return AsyncHelper.RunSync(() => MergeResultAsync<TResult>());
} }
public override async Task<TResult> MergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken()) public override async Task<TResult> DoMergeResultAsync<TResult>(CancellationToken cancellationToken = new CancellationToken())
{ {
var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).SingleOrDefaultAsync(cancellationToken), cancellationToken); var result = await base.ExecuteAsync( queryable => ((IQueryable<TResult>)queryable).SingleOrDefaultAsync(cancellationToken), cancellationToken);
var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable(); var q = result.Where(o => o != null&&o.QueryResult!=null).Select(o=>o.QueryResult).AsQueryable();