DbContext查询支持读写分离的追踪

This commit is contained in:
xuejiaming 2021-12-31 15:31:48 +08:00
parent 863ba69a03
commit d4a35698cd
37 changed files with 743 additions and 112 deletions

View File

@ -1,9 +1,9 @@
:start :start
::定义版本 ::定义版本
set EFCORE2=2.3.2.08 set EFCORE2=2.3.2.09
set EFCORE3=3.3.2.08 set EFCORE3=3.3.2.09
set EFCORE5=5.3.2.08 set EFCORE5=5.3.2.09
set EFCORE6=6.3.2.08 set EFCORE6=6.3.2.09
::删除所有bin与obj下的文件 ::删除所有bin与obj下的文件
@echo off @echo off

View File

@ -26,8 +26,9 @@ namespace Sample.NoShardingMultiLevel.Controllers
[HttpGet] [HttpGet]
public async Task<IActionResult> Get() public async Task<IActionResult> Get()
{ {
//var dbContext = _defaultDbContext.GetDbContext("ds0",false,new SingleQueryRouteTail(string.Empty)); //var dbContext = _defaultDbContext.GetDbContext("ds0",false,new SingleQueryRouteTail(string.Empty));.Select(o=>new {o.Id,o.Name,o.Company})
var boss =await _defaultDbContext.Set<Boss>().Include(o=>o.Company).FirstOrDefaultAsync(); var boss =await _defaultDbContext.Set<Boss>().Include(o=>o.Company).FirstOrDefaultAsync();
//_defaultDbContext.Attach(boss);
if (boss!=null) if (boss!=null)
{ {
var companyId = boss.Company.Id; var companyId = boss.Company.Id;

View File

@ -2,6 +2,7 @@ using Microsoft.EntityFrameworkCore;
using Sample.NoShardingMultiLevel; using Sample.NoShardingMultiLevel;
using ShardingCore; using ShardingCore;
using ShardingCore.Bootstrapers; using ShardingCore.Bootstrapers;
using ShardingCore.Sharding.ReadWriteConfigurations;
using ShardingCore.TableExists; using ShardingCore.TableExists;
ILoggerFactory efLogger = LoggerFactory.Create(builder => ILoggerFactory efLogger = LoggerFactory.Create(builder =>
@ -28,6 +29,18 @@ builder.Services.AddShardingDbContext<DefaultDbContext>((conStr, builder) => bui
}) })
.AddDefaultDataSource("ds0", "Data Source=localhost;Initial Catalog=dbmulti;Integrated Security=True;") .AddDefaultDataSource("ds0", "Data Source=localhost;Initial Catalog=dbmulti;Integrated Security=True;")
.AddTableEnsureManager(sp => new SqlServerTableEnsureManager<DefaultDbContext>()) .AddTableEnsureManager(sp => new SqlServerTableEnsureManager<DefaultDbContext>())
.AddReadWriteSeparation(sp =>
{
return new Dictionary<string, IEnumerable<string>>()
{
{
"ds0", new List<string>()
{
"Data Source=localhost;Initial Catalog=dbmulti;Integrated Security=True;"
}
}
};
}, ReadStrategyEnum.Loop, defaultEnable: true)
.End(); .End();
var app = builder.Build(); var app = builder.Build();

View File

@ -1,4 +1,6 @@
using Microsoft.EntityFrameworkCore; using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Sample.SqlServerShardingTable.Entities; using Sample.SqlServerShardingTable.Entities;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions; using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding; using ShardingCore.Sharding;
@ -54,5 +56,17 @@ namespace Sample.SqlServerShardingTable
/// empty impl /// empty impl
/// </summary> /// </summary>
public IRouteTail RouteTail { get; set; } public IRouteTail RouteTail { get; set; }
public override void Dispose()
{
Console.WriteLine("MyDbContext disposed");
base.Dispose();
}
public override ValueTask DisposeAsync()
{
Console.WriteLine("MyDbContext disposed async");
return base.DisposeAsync();
}
} }
} }

View File

@ -57,7 +57,18 @@ namespace Sample.SqlServerShardingTable
op.AddShardingTableRoute<SysUserVirtualTableRoute>(); op.AddShardingTableRoute<SysUserVirtualTableRoute>();
op.AddShardingTableRoute<OrderVirtualTableRoute>(); op.AddShardingTableRoute<OrderVirtualTableRoute>();
op.AddShardingTableRoute<MultiShardingOrderVirtualTableRoute>(); op.AddShardingTableRoute<MultiShardingOrderVirtualTableRoute>();
}).End(); }).AddReadWriteSeparation(sp =>
{
return new Dictionary<string, IEnumerable<string>>()
{
{
"ds0", new List<string>()
{
"Data Source=localhost;Initial Catalog=EFCoreShardingTableDB;Integrated Security=True;"
}
}
};
},ReadStrategyEnum.Loop,defaultEnable:true).End();
} }
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.

View File

@ -42,7 +42,6 @@ namespace ShardingCore.Bootstrapers
private readonly string _virtualTableName; private readonly string _virtualTableName;
private readonly Expression<Func<TEntity,bool>> _queryFilterExpression; private readonly Expression<Func<TEntity,bool>> _queryFilterExpression;
private readonly IShardingConfigOption<TShardingDbContext> _shardingConfigOption; private readonly IShardingConfigOption<TShardingDbContext> _shardingConfigOption;
private readonly ITrackerManager<TShardingDbContext> _trackerManager;
private readonly IVirtualDataSource<TShardingDbContext> _virtualDataSource; private readonly IVirtualDataSource<TShardingDbContext> _virtualDataSource;
private readonly IVirtualTableManager<TShardingDbContext> _virtualTableManager; private readonly IVirtualTableManager<TShardingDbContext> _virtualTableManager;
private readonly IEntityMetadataManager<TShardingDbContext> _entityMetadataManager; private readonly IEntityMetadataManager<TShardingDbContext> _entityMetadataManager;
@ -50,7 +49,7 @@ namespace ShardingCore.Bootstrapers
public EntityMetadataInitializer(EntityMetadataEnsureParams entityMetadataEnsureParams public EntityMetadataInitializer(EntityMetadataEnsureParams entityMetadataEnsureParams
, IShardingConfigOption<TShardingDbContext> shardingConfigOption, , IShardingConfigOption<TShardingDbContext> shardingConfigOption,
ITrackerManager<TShardingDbContext> trackerManager,IVirtualDataSource<TShardingDbContext> virtualDataSource,IVirtualTableManager<TShardingDbContext> virtualTableManager, IVirtualDataSource<TShardingDbContext> virtualDataSource,IVirtualTableManager<TShardingDbContext> virtualTableManager,
IEntityMetadataManager<TShardingDbContext> entityMetadataManager, IEntityMetadataManager<TShardingDbContext> entityMetadataManager,
ILogger<EntityMetadataInitializer<TShardingDbContext, TEntity>> logger ILogger<EntityMetadataInitializer<TShardingDbContext, TEntity>> logger
) )
@ -59,7 +58,6 @@ namespace ShardingCore.Bootstrapers
_virtualTableName = entityMetadataEnsureParams.VirtualTableName; _virtualTableName = entityMetadataEnsureParams.VirtualTableName;
_queryFilterExpression = entityMetadataEnsureParams.EntityType.GetAnnotations().FirstOrDefault(o=>o.Name== QueryFilter)?.Value as Expression<Func<TEntity, bool>>; _queryFilterExpression = entityMetadataEnsureParams.EntityType.GetAnnotations().FirstOrDefault(o=>o.Name== QueryFilter)?.Value as Expression<Func<TEntity, bool>>;
_shardingConfigOption = shardingConfigOption; _shardingConfigOption = shardingConfigOption;
_trackerManager = trackerManager;
_virtualDataSource = virtualDataSource; _virtualDataSource = virtualDataSource;
_virtualTableManager = virtualTableManager; _virtualTableManager = virtualTableManager;
_entityMetadataManager = entityMetadataManager; _entityMetadataManager = entityMetadataManager;
@ -75,7 +73,6 @@ namespace ShardingCore.Bootstrapers
public void Initialize() public void Initialize()
{ {
var shardingEntityType = _entityType.ClrType; var shardingEntityType = _entityType.ClrType;
_trackerManager.AddDbContextModel(shardingEntityType);
var entityMetadata = new EntityMetadata(shardingEntityType, _virtualTableName,typeof(TShardingDbContext),_entityType.FindPrimaryKey().Properties.Select(o=>o.PropertyInfo).ToList(),_queryFilterExpression); var entityMetadata = new EntityMetadata(shardingEntityType, _virtualTableName,typeof(TShardingDbContext),_entityType.FindPrimaryKey().Properties.Select(o=>o.PropertyInfo).ToList(),_queryFilterExpression);
if (!_entityMetadataManager.AddEntityMetadata(entityMetadata)) if (!_entityMetadataManager.AddEntityMetadata(entityMetadata))
throw new ShardingCoreInvalidOperationException($"repeat add entity metadata {shardingEntityType.FullName}"); throw new ShardingCoreInvalidOperationException($"repeat add entity metadata {shardingEntityType.FullName}");

View File

@ -59,13 +59,15 @@ namespace ShardingCore.Bootstrapers
private readonly IEntityMetadataManager<TShardingDbContext> _entityMetadataManager; private readonly IEntityMetadataManager<TShardingDbContext> _entityMetadataManager;
private readonly IParallelTableManager<TShardingDbContext> _parallelTableManager; private readonly IParallelTableManager<TShardingDbContext> _parallelTableManager;
private readonly IDataSourceInitializer<TShardingDbContext> _dataSourceInitializer; private readonly IDataSourceInitializer<TShardingDbContext> _dataSourceInitializer;
private readonly ITrackerManager<TShardingDbContext> _trackerManager;
private readonly Type _shardingDbContextType; private readonly Type _shardingDbContextType;
public ShardingDbContextBootstrapper(IShardingConfigOption<TShardingDbContext> shardingConfigOption, public ShardingDbContextBootstrapper(IShardingConfigOption<TShardingDbContext> shardingConfigOption,
IEntityMetadataManager<TShardingDbContext> entityMetadataManager, IEntityMetadataManager<TShardingDbContext> entityMetadataManager,
IVirtualDataSource<TShardingDbContext> virtualDataSource, IVirtualDataSource<TShardingDbContext> virtualDataSource,
IParallelTableManager<TShardingDbContext> parallelTableManager, IParallelTableManager<TShardingDbContext> parallelTableManager,
IDataSourceInitializer<TShardingDbContext> dataSourceInitializer) IDataSourceInitializer<TShardingDbContext> dataSourceInitializer,
ITrackerManager<TShardingDbContext> trackerManager)
{ {
_shardingConfigOption = shardingConfigOption; _shardingConfigOption = shardingConfigOption;
_shardingDbContextType = typeof(TShardingDbContext); _shardingDbContextType = typeof(TShardingDbContext);
@ -73,6 +75,7 @@ namespace ShardingCore.Bootstrapers
_virtualDataSource= virtualDataSource; _virtualDataSource= virtualDataSource;
_parallelTableManager = parallelTableManager; _parallelTableManager = parallelTableManager;
_dataSourceInitializer = dataSourceInitializer; _dataSourceInitializer = dataSourceInitializer;
_trackerManager = trackerManager;
} }
/// <summary> /// <summary>
/// 初始化 /// 初始化
@ -97,6 +100,7 @@ namespace ShardingCore.Bootstrapers
foreach (var entity in context.Model.GetEntityTypes()) foreach (var entity in context.Model.GetEntityTypes())
{ {
var entityType = entity.ClrType; var entityType = entity.ClrType;
_trackerManager.AddDbContextModel(entityType);
//entity.GetAnnotation("") //entity.GetAnnotation("")
if (_shardingConfigOption.HasVirtualDataSourceRoute(entityType) || if (_shardingConfigOption.HasVirtualDataSourceRoute(entityType) ||
_shardingConfigOption.HasVirtualTableRoute(entityType)) _shardingConfigOption.HasVirtualTableRoute(entityType))

View File

@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Core.QueryTrackers
{
public interface IQueryTracker
{
public object Track(object entity,IShardingDbContext shardingDbContext);
}
}

View File

@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Core.QueryTrackers
{
public class QueryTracker : IQueryTracker
{
public object Track(object entity, IShardingDbContext shardingDbContext)
{
var genericDbContext = shardingDbContext.CreateGenericDbContext(entity);
var attachedEntity = genericDbContext.GetAttachedEntity(entity);
if (attachedEntity == null)
genericDbContext.Attach(entity);
else
{
return attachedEntity;
}
return null;
}
}
}

View File

@ -29,11 +29,13 @@ using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingQueryExecutors; using ShardingCore.Sharding.ShardingQueryExecutors;
using ShardingCore.TableCreator; using ShardingCore.TableCreator;
using System; using System;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.DynamicDataSources; using ShardingCore.DynamicDataSources;
using ShardingCore.Sharding.ParallelTables; using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ReadWriteConfigurations; using ShardingCore.Sharding.ReadWriteConfigurations;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions; using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using ShardingCore.Sharding.ShardingExecutors; using ShardingCore.Sharding.ShardingExecutors;
using ShardingCore.Sharding.ShardingExecutors.NativeTrackQueries;
using ShardingCore.TableExists; using ShardingCore.TableExists;
namespace ShardingCore namespace ShardingCore
@ -127,6 +129,10 @@ namespace ShardingCore
services.TryAddSingleton<IShardingPageManager, ShardingPageManager>(); services.TryAddSingleton<IShardingPageManager, ShardingPageManager>();
services.TryAddSingleton<IShardingPageAccessor, ShardingPageAccessor>(); services.TryAddSingleton<IShardingPageAccessor, ShardingPageAccessor>();
services.TryAddSingleton<IShardingBootstrapper, ShardingBootstrapper>(); services.TryAddSingleton<IShardingBootstrapper, ShardingBootstrapper>();
services.TryAddSingleton<IQueryTracker, QueryTracker>();
services.TryAddSingleton<IShardingTrackQueryExecutor, DefaultShardingTrackQueryExecutor>();
services.TryAddSingleton<INativeEnumeratorTrackQueryExecutor, NativeEnumeratorTrackQueryExecutor>();
services.TryAddSingleton<INativeSingleTrackQueryExecutor, NativeSingleTrackQueryExecutor>();
services.TryAddShardingJob(); services.TryAddShardingJob();
return services; return services;

View File

@ -0,0 +1,51 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Extensions
{
public static class QueryCompilerContextExtension
{
public static Type GetQueryableEntityType( this IQueryCompilerContext queryCompilerContext)
{
if (queryCompilerContext.IsEnumerableQuery())
{
return GetEnumerableQueryEntityType(queryCompilerContext);
}
else
{
return (queryCompilerContext.GetQueryExpression() as MethodCallExpression)
.GetQueryEntityType();
}
}
private static Type GetEnumerableQueryEntityType(IQueryCompilerContext queryCompilerContext)
{
return queryCompilerContext.GetQueryExpression().Type.GetGenericArguments()[0];
}
public static bool IsEntityQuery(this IQueryCompilerContext queryCompilerContext)
{
if (queryCompilerContext.GetQueryExpression() is MethodCallExpression methodCallExpression)
{
var name = methodCallExpression.Method.Name;
switch (name)
{
case nameof(Queryable.First):
case nameof(Queryable.FirstOrDefault):
case nameof(Queryable.Last):
case nameof(Queryable.LastOrDefault):
case nameof(Queryable.Single):
case nameof(Queryable.SingleOrDefault):
return true;
}
}
return false;
}
}
}

View File

@ -0,0 +1,45 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.Abstractions
{
public interface IShardingTrackQueryExecutor
{
/// <summary>
/// execute query
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="queryCompilerContext"></param>
/// <returns></returns>
TResult Execute<TResult>(IQueryCompilerContext queryCompilerContext);
#if !EFCORE2
/// <summary>
/// execute query async
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="queryCompilerContext"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
TResult ExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext, CancellationToken cancellationToken = new CancellationToken());
#endif
#if EFCORE2
/// <summary>
/// execute query async
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="queryCompilerContext"></param>
/// <param name="query"></param>
/// <returns></returns>
IAsyncEnumerable<TResult> ExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext);
Task<TResult> ExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext, CancellationToken cancellationToken);
#endif
}
}

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators.TrackerEnumerators;
namespace ShardingCore.Sharding.Enumerators.TrackerEnumerables
{
public class AsyncTrackerEnumerable<T> : IAsyncEnumerable<T>
{
private readonly IShardingDbContext _shardingDbContext;
private readonly IAsyncEnumerable<T> _asyncEnumerable;
public AsyncTrackerEnumerable(IShardingDbContext shardingDbContext, IAsyncEnumerable<T> asyncEnumerable)
{
_shardingDbContext = shardingDbContext;
_asyncEnumerable = asyncEnumerable;
}
#if !EFCORE2
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
return new AsyncTrackerEnumerator<T>(_shardingDbContext,_asyncEnumerable.GetAsyncEnumerator(cancellationToken));
}
#endif
#if EFCORE2
public IAsyncEnumerator<T> GetEnumerator()
{
return new AsyncTrackerEnumerator<T>(_shardingDbContext, _asyncEnumerable.GetEnumerator());
}
#endif
}
}

View File

@ -0,0 +1,32 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators.TrackerEnumerators;
namespace ShardingCore.Sharding.Enumerators.TrackerEnumerables
{
public class TrackEnumerable<T>:IEnumerable<T>
{
private readonly IShardingDbContext _shardingDbContext;
private readonly IEnumerable<T> _enumerable;
public TrackEnumerable(IShardingDbContext shardingDbContext,IEnumerable<T> enumerable)
{
_shardingDbContext = shardingDbContext;
_enumerable = enumerable;
}
public IEnumerator<T> GetEnumerator()
{
return new TrackerEnumerator<T>(_shardingDbContext,_enumerable.GetEnumerator());
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
}

View File

@ -1,7 +1,9 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.Extensions; using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators
{ {
@ -16,13 +18,15 @@ namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators
internal class AsyncTrackerEnumerator<T> : IAsyncEnumerator<T> internal class AsyncTrackerEnumerator<T> : IAsyncEnumerator<T>
{ {
private readonly StreamMergeContext<T> _streamMergeContext; private readonly IShardingDbContext _shardingDbContext;
private readonly IAsyncEnumerator<T> _asyncEnumerator; private readonly IAsyncEnumerator<T> _asyncEnumerator;
private readonly IQueryTracker _queryTrack;
public AsyncTrackerEnumerator(StreamMergeContext<T> streamMergeContext, IAsyncEnumerator<T> asyncEnumerator) public AsyncTrackerEnumerator(IShardingDbContext shardingDbContext, IAsyncEnumerator<T> asyncEnumerator)
{ {
_streamMergeContext = streamMergeContext; _shardingDbContext = shardingDbContext;
_asyncEnumerator = asyncEnumerator; _asyncEnumerator = asyncEnumerator;
_queryTrack = ShardingContainer.GetService<IQueryTracker>();
} }
public ValueTask DisposeAsync() public ValueTask DisposeAsync()
{ {
@ -40,12 +44,8 @@ namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators
var current = _asyncEnumerator.Current; var current = _asyncEnumerator.Current;
if (current != null) if (current != null)
{ {
var c = (object)current; var attachedEntity = _queryTrack.Track(current, _shardingDbContext);
var genericDbContext = _streamMergeContext.GetShardingDbContext().CreateGenericDbContext(c); if (attachedEntity!=null)
var attachedEntity = genericDbContext.GetAttachedEntity(c);
if (attachedEntity==null)
genericDbContext.Attach(current);
else
{ {
return (T)attachedEntity; return (T)attachedEntity;
} }
@ -60,13 +60,15 @@ namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators
public class AsyncTrackerEnumerator<T> : IAsyncEnumerator<T> public class AsyncTrackerEnumerator<T> : IAsyncEnumerator<T>
{ {
private readonly StreamMergeContext<T> _streamMergeContext; private readonly IShardingDbContext _shardingDbContext;
private readonly IAsyncEnumerator<T> _asyncEnumerator; private readonly IAsyncEnumerator<T> _asyncEnumerator;
private readonly IQueryTracker _queryTrack;
public AsyncTrackerEnumerator(StreamMergeContext<T> streamMergeContext, IAsyncEnumerator<T> asyncEnumerator) public AsyncTrackerEnumerator(IShardingDbContext shardingDbContext, IAsyncEnumerator<T> asyncEnumerator)
{ {
_streamMergeContext = streamMergeContext; _shardingDbContext = shardingDbContext;
_asyncEnumerator = asyncEnumerator; _asyncEnumerator = asyncEnumerator;
_queryTrack = ShardingContainer.GetService<IQueryTracker>();
} }
public Task<bool> MoveNext(CancellationToken cancellationToken) public Task<bool> MoveNext(CancellationToken cancellationToken)
@ -80,12 +82,8 @@ namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators
var current = _asyncEnumerator.Current; var current = _asyncEnumerator.Current;
if (current != null) if (current != null)
{ {
var c = (object)current; var attachedEntity = _queryTrack.Track(current, _shardingDbContext);
var genericDbContext = _streamMergeContext.GetShardingDbContext().CreateGenericDbContext(c); if (attachedEntity != null)
var attachedEntity = genericDbContext.GetAttachedEntity(c);
if (attachedEntity==null)
genericDbContext.Attach(current);
else
{ {
return (T)attachedEntity; return (T)attachedEntity;
} }

View File

@ -1,6 +1,8 @@
using System.Collections; using System.Collections;
using System.Collections.Generic; using System.Collections.Generic;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.Extensions; using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators
{ {
@ -13,13 +15,15 @@ namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators
*/ */
internal class TrackerEnumerator<T>: IEnumerator<T> internal class TrackerEnumerator<T>: IEnumerator<T>
{ {
private readonly StreamMergeContext<T> _streamMergeContext; private readonly IShardingDbContext _shardingDbContext;
private readonly IEnumerator<T> _enumerator; private readonly IEnumerator<T> _enumerator;
private readonly IQueryTracker _queryTrack;
public TrackerEnumerator(StreamMergeContext<T> streamMergeContext,IEnumerator<T> enumerator) public TrackerEnumerator(IShardingDbContext shardingDbContext,IEnumerator<T> enumerator)
{ {
_streamMergeContext = streamMergeContext; _shardingDbContext = shardingDbContext;
_enumerator = enumerator; _enumerator = enumerator;
_queryTrack = ShardingContainer.GetService<IQueryTracker>();
} }
public bool MoveNext() public bool MoveNext()
{ {
@ -44,16 +48,10 @@ namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators
var current = _enumerator.Current; var current = _enumerator.Current;
if (current != null) if (current != null)
{ {
var c = (object)current; var attachedEntity = _queryTrack.Track(current, _shardingDbContext);
var genericDbContext = _streamMergeContext.GetShardingDbContext().CreateGenericDbContext(c); if (attachedEntity != null)
var attachedEntity = genericDbContext.GetAttachedEntity(c);
if (attachedEntity == null)
{ {
genericDbContext.Attach(current); return (T)attachedEntity;
}
else
{
return (T)attachedEntity;
} }
} }
return current; return current;

View File

@ -38,7 +38,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
if (_mergeContext.IsUseShardingTrack(typeof(T))) if (_mergeContext.IsUseShardingTrack(typeof(T)))
{ {
return new AsyncTrackerEnumerator<T>(_mergeContext, asyncEnumerator); return new AsyncTrackerEnumerator<T>(_mergeContext.GetShardingDbContext(), asyncEnumerator);
} }
return asyncEnumerator; return asyncEnumerator;
@ -55,7 +55,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
.GetEnumerator(); .GetEnumerator();
if (_mergeContext.IsUseShardingTrack(typeof(T))) if (_mergeContext.IsUseShardingTrack(typeof(T)))
{ {
return new AsyncTrackerEnumerator<T>(_mergeContext, asyncEnumerator); return new AsyncTrackerEnumerator<T>(_mergeContext.GetShardingDbContext(), asyncEnumerator);
} }
return asyncEnumerator; return asyncEnumerator;
} }
@ -72,7 +72,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
if (_mergeContext.IsUseShardingTrack(typeof(T))) if (_mergeContext.IsUseShardingTrack(typeof(T)))
{ {
return new TrackerEnumerator<T>(_mergeContext, enumerator); return new TrackerEnumerator<T>(_mergeContext.GetShardingDbContext(), enumerator);
} }
return enumerator; return enumerator;
} }

View File

@ -18,7 +18,6 @@ namespace ShardingCore.Sharding.ShardingExecutors.Abstractions
bool IsCrossTable(); bool IsCrossTable();
bool IsCrossDataSource(); bool IsCrossDataSource();
bool IsEnumerableQuery(); //bool IsEnumerableQuery();
bool IsParallelQuery();
} }
} }

View File

@ -25,7 +25,7 @@ namespace ShardingCore.Sharding.ShardingExecutors.Abstractions
/// 当前是否读写分离走读库(包括是否启用读写分离和是否当前的dbcontext启用了读库查询) /// 当前是否读写分离走读库(包括是否启用读写分离和是否当前的dbcontext启用了读库查询)
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
bool CurrentQueryReadConnection(); bool IsParallelQuery();
/// <summary> /// <summary>
/// 是否是未追踪查询 /// 是否是未追踪查询
/// </summary> /// </summary>

View File

@ -1,43 +1,25 @@
using System; using ShardingCore.Sharding.Abstractions;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions; using System.Linq.Expressions;
using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Internal;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
using ShardingCore.Utils;
namespace ShardingCore.Sharding.ShardingExecutors namespace ShardingCore.Sharding.ShardingExecutors
{ {
public class DefaultShardingComplierExecutor: IShardingComplierExecutor public class DefaultShardingComplierExecutor: IShardingComplierExecutor
{ {
private readonly IShardingQueryExecutor _shardingQueryExecutor; private readonly IShardingTrackQueryExecutor _shardingTrackQueryExecutor;
private readonly IQueryCompilerContextFactory _queryCompilerContextFactory; private readonly IQueryCompilerContextFactory _queryCompilerContextFactory;
public DefaultShardingComplierExecutor(IShardingQueryExecutor shardingQueryExecutor, IQueryCompilerContextFactory queryCompilerContextFactory) public DefaultShardingComplierExecutor(IShardingTrackQueryExecutor shardingTrackQueryExecutor, IQueryCompilerContextFactory queryCompilerContextFactory)
{ {
_shardingQueryExecutor = shardingQueryExecutor; _shardingTrackQueryExecutor = shardingTrackQueryExecutor;
_queryCompilerContextFactory = queryCompilerContextFactory; _queryCompilerContextFactory = queryCompilerContextFactory;
} }
public TResult Execute<TResult>(IShardingDbContext shardingDbContext, Expression query) public TResult Execute<TResult>(IShardingDbContext shardingDbContext, Expression query)
{ {
var queryCompilerContext = _queryCompilerContextFactory.Create(shardingDbContext, query); var queryCompilerContext = _queryCompilerContextFactory.Create(shardingDbContext, query);
var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor(); return _shardingTrackQueryExecutor.Execute<TResult>(queryCompilerContext);
if (queryCompilerExecutor != null)
{
return queryCompilerExecutor.GetQueryCompiler().Execute<TResult>(queryCompilerExecutor.GetReplaceQueryExpression());
}
if (!(queryCompilerContext is IMergeQueryCompilerContext mergeCompilerContext))
throw new ShardingCoreInvalidOperationException(
$"{nameof(queryCompilerContext)} is not {nameof(IMergeQueryCompilerContext)}");
return _shardingQueryExecutor.Execute<TResult>(mergeCompilerContext);
} }
#if !EFCORE2 #if !EFCORE2
@ -46,15 +28,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
CancellationToken cancellationToken = new CancellationToken()) CancellationToken cancellationToken = new CancellationToken())
{ {
var queryCompilerContext = _queryCompilerContextFactory.Create(shardingDbContext, query); var queryCompilerContext = _queryCompilerContextFactory.Create(shardingDbContext, query);
var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor(); return _shardingTrackQueryExecutor.ExecuteAsync<TResult>(queryCompilerContext);
if (queryCompilerExecutor != null)
{
return queryCompilerExecutor.GetQueryCompiler().ExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression(), cancellationToken);
}
if (!(queryCompilerContext is IMergeQueryCompilerContext mergeCompilerContext))
throw new ShardingCoreInvalidOperationException(
$"{nameof(queryCompilerContext)} is not {nameof(IMergeQueryCompilerContext)}");
return _shardingQueryExecutor.ExecuteAsync<TResult>(mergeCompilerContext, cancellationToken);
} }
#endif #endif
@ -62,30 +36,14 @@ namespace ShardingCore.Sharding.ShardingExecutors
public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query) public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query)
{ {
var queryCompilerContext = _queryCompilerContextFactory.Create(shardingDbContext, query); var queryCompilerContext = _queryCompilerContextFactory.Create(shardingDbContext, query);
var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor(); return _shardingTrackQueryExecutor.ExecuteAsync<TResult>(queryCompilerContext);
if (queryCompilerExecutor != null)
{
return queryCompilerExecutor.GetQueryCompiler().ExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression());
}
if (!(queryCompilerContext is IMergeQueryCompilerContext mergeCompilerContext))
throw new ShardingCoreInvalidOperationException(
$"{nameof(queryCompilerContext)} is not {nameof(IMergeQueryCompilerContext)}");
return _shardingQueryExecutor.ExecuteAsync<IAsyncEnumerable<TResult>>(mergeCompilerContext);
} }
public Task<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query, public Task<TResult> ExecuteAsync<TResult>(IShardingDbContext shardingDbContext, Expression query,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
var queryCompilerContext = _queryCompilerContextFactory.Create(shardingDbContext, query); var queryCompilerContext = _queryCompilerContextFactory.Create(shardingDbContext, query);
var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor(); return _shardingTrackQueryExecutor.ExecuteAsync<TResult>(queryCompilerContext, cancellationToken);
if (queryCompilerExecutor != null)
{
return queryCompilerExecutor.GetQueryCompiler().ExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression(), cancellationToken);
}
if (!(queryCompilerContext is IMergeQueryCompilerContext mergeCompilerContext))
throw new ShardingCoreInvalidOperationException(
$"{nameof(queryCompilerContext)} is not {nameof(IMergeQueryCompilerContext)}");
return _shardingQueryExecutor.ExecuteAsync<Task<TResult>>(mergeCompilerContext, cancellationToken);
} }
#endif #endif
} }

View File

@ -64,7 +64,6 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
{ {
switch (methodCallExpression.Method.Name) switch (methodCallExpression.Method.Name)
{ {
case nameof(Enumerable.First): case nameof(Enumerable.First):
return EnsureResultTypeMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken); return EnsureResultTypeMergeExecute<TResult>(typeof(FirstAsyncInMemoryMergeEngine<>), mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.FirstOrDefault): case nameof(Enumerable.FirstOrDefault):

View File

@ -0,0 +1,225 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
using ShardingCore.Sharding.ShardingExecutors.NativeTrackQueries;
namespace ShardingCore.Sharding.ShardingExecutors
{
public class DefaultShardingTrackQueryExecutor: IShardingTrackQueryExecutor
{
private static readonly MethodInfo NativeSingleTrackQueryExecutorTrack
= typeof(NativeSingleTrackQueryExecutor)
.GetMethod(
nameof(NativeSingleTrackQueryExecutor.Track),
BindingFlags.Instance | BindingFlags.Public
);
private static readonly MethodInfo NativeSingleTrackQueryExecutorTrackAsync
= typeof(NativeSingleTrackQueryExecutor)
.GetMethod(
nameof(NativeSingleTrackQueryExecutor.TrackAsync),
BindingFlags.Instance | BindingFlags.Public
);
private static readonly MethodInfo NativeEnumeratorTrackQueryExecutorTrack
= typeof(NativeEnumeratorTrackQueryExecutor)
.GetMethod(
nameof(NativeEnumeratorTrackQueryExecutor.Track),
BindingFlags.Instance | BindingFlags.Public
);
private static readonly MethodInfo NativeEnumeratorTrackQueryExecutorTrackAsync
= typeof(NativeEnumeratorTrackQueryExecutor)
.GetMethod(
nameof(NativeEnumeratorTrackQueryExecutor.TrackAsync),
BindingFlags.Instance | BindingFlags.Public
);
private readonly IShardingQueryExecutor _shardingQueryExecutor;
private readonly INativeEnumeratorTrackQueryExecutor _nativeEnumeratorTrackQueryExecutor;
private readonly INativeSingleTrackQueryExecutor _nativeSingleTrackQueryExecutor;
public DefaultShardingTrackQueryExecutor(IShardingQueryExecutor shardingQueryExecutor, INativeEnumeratorTrackQueryExecutor nativeEnumeratorTrackQueryExecutor,INativeSingleTrackQueryExecutor nativeSingleTrackQueryExecutor)
{
_shardingQueryExecutor = shardingQueryExecutor;
_nativeEnumeratorTrackQueryExecutor = nativeEnumeratorTrackQueryExecutor;
_nativeSingleTrackQueryExecutor = nativeSingleTrackQueryExecutor;
}
public TResult Execute<TResult>(IQueryCompilerContext queryCompilerContext)
{
var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor();
if (queryCompilerExecutor == null)
{
if (queryCompilerContext is IMergeQueryCompilerContext mergeQueryCompilerContext)
{
return _shardingQueryExecutor.Execute<TResult>(mergeQueryCompilerContext);
}
throw new ShardingCoreNotFoundException(queryCompilerContext.GetQueryExpression().ShardingPrint());
}
var result = queryCompilerExecutor.GetQueryCompiler().Execute<TResult>(queryCompilerExecutor.GetReplaceQueryExpression());
//native query
if (queryCompilerContext.IsParallelQuery() && queryCompilerContext.IsQueryTrack())
{
var queryEntityType = queryCompilerContext.GetQueryableEntityType();
var trackerManager =
(ITrackerManager)ShardingContainer.GetService(
typeof(ITrackerManager<>).GetGenericType0(queryCompilerContext.GetShardingDbContextType()));
if (trackerManager.EntityUseTrack(queryEntityType))
{
if (queryCompilerContext.IsEnumerableQuery())
{
return NativeExecute(_nativeEnumeratorTrackQueryExecutor, NativeEnumeratorTrackQueryExecutorTrack,
queryCompilerContext, queryEntityType, result);
}
else if (queryCompilerContext.IsEntityQuery())
{
return NativeExecute(_nativeSingleTrackQueryExecutor, NativeSingleTrackQueryExecutorTrack,
queryCompilerContext, queryEntityType, result);
}
}
return result;
}
return result;
}
private TResult NativeExecute<TResult>(object executor, MethodInfo executorMethod,
IQueryCompilerContext queryCompilerContext,Type queryEntityType, TResult result)
{
return (TResult)executorMethod
.MakeGenericMethod(queryEntityType)
.Invoke(executor, new object[] { queryCompilerContext, result });
}
#if !EFCORE2
public TResult ExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext,
CancellationToken cancellationToken = new CancellationToken())
{
var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor();
if (queryCompilerExecutor == null)
{
if (queryCompilerContext is IMergeQueryCompilerContext mergeQueryCompilerContext)
{
return _shardingQueryExecutor.ExecuteAsync<TResult>(mergeQueryCompilerContext);
}
throw new ShardingCoreNotFoundException(queryCompilerContext.GetQueryExpression().ShardingPrint());
}
var result = queryCompilerExecutor.GetQueryCompiler().ExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression(), cancellationToken);
//native query
if (queryCompilerContext.IsParallelQuery()&&queryCompilerContext.IsQueryTrack())
{
var queryEntityType = queryCompilerContext.GetQueryableEntityType();
var trackerManager =
(ITrackerManager)ShardingContainer.GetService(
typeof(ITrackerManager<>).GetGenericType0(queryCompilerContext.GetShardingDbContextType()));
if (trackerManager.EntityUseTrack(queryEntityType))
{
if (queryCompilerContext.IsEnumerableQuery())
{
return NativeExecute(_nativeEnumeratorTrackQueryExecutor, NativeEnumeratorTrackQueryExecutorTrackAsync,
queryCompilerContext,queryEntityType, result);
}
else if (queryCompilerContext.IsEntityQuery())
{
return NativeExecute(_nativeSingleTrackQueryExecutor, NativeSingleTrackQueryExecutorTrackAsync,
queryCompilerContext, queryEntityType, result);
}
}
return result;
}
return result;
}
#endif
#if EFCORE2
public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext)
{
var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor();
if (queryCompilerExecutor == null)
{
if (queryCompilerContext is IMergeQueryCompilerContext mergeQueryCompilerContext)
{
return _shardingQueryExecutor.ExecuteAsync<IAsyncEnumerable<TResult>>(mergeQueryCompilerContext);
}
throw new ShardingCoreNotFoundException(queryCompilerContext.GetQueryExpression().ShardingPrint());
}
var result = queryCompilerExecutor.GetQueryCompiler().ExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression());
//native query
if (queryCompilerContext.IsParallelQuery()&&queryCompilerContext.IsQueryTrack())
{
var queryEntityType = queryCompilerContext.GetQueryableEntityType();
var trackerManager =
(ITrackerManager)ShardingContainer.GetService(
typeof(ITrackerManager<>).GetGenericType0(queryCompilerContext.GetShardingDbContextType()));
if (trackerManager.EntityUseTrack(queryEntityType))
{
if (queryCompilerContext.IsEnumerableQuery())
{
return NativeExecute(_nativeEnumeratorTrackQueryExecutor, NativeEnumeratorTrackQueryExecutorTrack,
queryCompilerContext, queryEntityType, result);
}
else if (queryCompilerContext.IsEntityQuery())
{
return NativeExecute(_nativeSingleTrackQueryExecutor, NativeSingleTrackQueryExecutorTrack,
queryCompilerContext, queryEntityType, result);
}
}
return result;
}
return result;
}
public Task<TResult> ExecuteAsync<TResult>(IQueryCompilerContext queryCompilerContext, CancellationToken cancellationToken)
{
var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor();
if (queryCompilerExecutor == null)
{
if (queryCompilerContext is IMergeQueryCompilerContext mergeQueryCompilerContext)
{
return _shardingQueryExecutor.ExecuteAsync<Task<TResult>>(mergeQueryCompilerContext);
}
throw new ShardingCoreNotFoundException(queryCompilerContext.GetQueryExpression().ShardingPrint());
}
var result = queryCompilerExecutor.GetQueryCompiler().ExecuteAsync<TResult>(queryCompilerExecutor.GetReplaceQueryExpression(), cancellationToken);
//native query
if (queryCompilerContext.IsParallelQuery()&&queryCompilerContext.IsQueryTrack())
{
var queryEntityType = queryCompilerContext.GetQueryableEntityType();
var trackerManager =
(ITrackerManager)ShardingContainer.GetService(
typeof(ITrackerManager<>).GetGenericType0(queryCompilerContext.GetShardingDbContextType()));
if (trackerManager.EntityUseTrack(queryEntityType))
{
if (queryCompilerContext.IsEnumerableQuery())
{
return NativeExecute(_nativeEnumeratorTrackQueryExecutor, NativeEnumeratorTrackQueryExecutorTrack,
queryCompilerContext, queryEntityType, result);
}
else if (queryCompilerContext.IsEntityQuery())
{
return NativeExecute(_nativeSingleTrackQueryExecutor, NativeSingleTrackQueryExecutorTrack,
queryCompilerContext, queryEntityType, result);
}
}
return result;
}
return result;
}
#endif
}
}

View File

@ -98,10 +98,6 @@ namespace ShardingCore.Sharding.ShardingExecutors
{ {
return _queryCompilerContext.GetShardingDbContextType(); return _queryCompilerContext.GetShardingDbContextType();
} }
public bool CurrentQueryReadConnection()
{
return _queryCompilerContext.CurrentQueryReadConnection();
}
public bool IsQueryTrack() public bool IsQueryTrack()
{ {
@ -119,7 +115,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
else else
{ {
hasQueryCompilerExecutor = IsSingleQuery(); hasQueryCompilerExecutor = IsSingleQuery();
if (hasQueryCompilerExecutor.Value&&(!IsQueryTrack()||!_existCrossTableTails)) if (hasQueryCompilerExecutor.Value)
{ {
//要么本次查询不追踪如果需要追踪不可以存在跨tails //要么本次查询不追踪如果需要追踪不可以存在跨tails
var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>(); var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
@ -176,7 +172,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
/// <returns></returns> /// <returns></returns>
public bool IsParallelQuery() public bool IsParallelQuery()
{ {
return _isCrossTable || _existCrossTableTails|| CurrentQueryReadConnection(); return _isCrossTable || _existCrossTableTails|| _queryCompilerContext.IsParallelQuery();
} }
} }
} }

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.NativeTrackQueries
{
public interface INativeEnumeratorTrackQueryExecutor
{
IEnumerable<TResult> Track<TResult>(IQueryCompilerContext queryCompilerContext, IEnumerable<TResult> enumerable);
IAsyncEnumerable<TResult> TrackAsync<TResult>(IQueryCompilerContext queryCompilerContext, IAsyncEnumerable<TResult> asyncEnumerable);
}
}

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.NativeTrackQueries
{
public interface INativeSingleTrackQueryExecutor
{
TResult Track<TResult>(IQueryCompilerContext queryCompilerContext, TResult resultTask);
Task<TResult> TrackAsync<TResult>(IQueryCompilerContext queryCompilerContext, Task<TResult> resultTask);
}
}

View File

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Sharding.Enumerators.TrackerEnumerables;
using ShardingCore.Sharding.Enumerators.TrackerEnumerators;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.NativeTrackQueries
{
public class NativeEnumeratorTrackQueryExecutor: INativeEnumeratorTrackQueryExecutor
{
public IEnumerable<TResult> Track<TResult>(IQueryCompilerContext queryCompilerContext, IEnumerable<TResult> enumerable)
{
return new TrackEnumerable<TResult>(queryCompilerContext.GetShardingDbContext(), enumerable);
}
public IAsyncEnumerable<TResult> TrackAsync<TResult>(IQueryCompilerContext queryCompilerContext,IAsyncEnumerable<TResult> asyncEnumerable)
{
return new AsyncTrackerEnumerable<TResult>(queryCompilerContext.GetShardingDbContext(), asyncEnumerable);
}
}
}

View File

@ -0,0 +1,46 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Extensions;
using ShardingCore.Sharding.ShardingExecutors.Abstractions;
namespace ShardingCore.Sharding.ShardingExecutors.NativeTrackQueries
{
public class NativeSingleTrackQueryExecutor: INativeSingleTrackQueryExecutor
{
private readonly IQueryTracker _queryTracker;
public NativeSingleTrackQueryExecutor(IQueryTracker queryTracker)
{
_queryTracker = queryTracker;
}
public TResult Track<TResult>(IQueryCompilerContext queryCompilerContext, TResult resultTask)
{
if (resultTask != null)
{
var trackerManager =
(ITrackerManager)ShardingContainer.GetService(
typeof(ITrackerManager<>).GetGenericType0(queryCompilerContext.GetShardingDbContextType()));
if (trackerManager.EntityUseTrack(resultTask.GetType()))
{
var trackedEntity = _queryTracker.Track(resultTask, queryCompilerContext.GetShardingDbContext());
if (trackedEntity != null)
{
return (TResult)trackedEntity;
}
}
}
return resultTask;
}
public async Task<TResult> TrackAsync<TResult>(IQueryCompilerContext queryCompilerContext, Task<TResult> resultTask)
{
var result = await resultTask;
return Track(queryCompilerContext, result);
}
}
}

View File

@ -28,7 +28,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
private QueryCompilerExecutor _queryCompilerExecutor; private QueryCompilerExecutor _queryCompilerExecutor;
private bool? hasQueryCompilerExecutor; private bool? hasQueryCompilerExecutor;
private bool? _isNoTracking; private bool? _isNoTracking;
private readonly bool _currentQueryReadConnection; private readonly bool _isParallelQuery;
private QueryCompilerContext( IShardingDbContext shardingDbContext, Expression queryExpression) private QueryCompilerContext( IShardingDbContext shardingDbContext, Expression queryExpression)
{ {
@ -40,7 +40,8 @@ namespace ShardingCore.Sharding.ShardingExecutors
_entityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(_shardingDbContextType)); _entityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(_shardingDbContextType));
_shardingConfigOption = ShardingContainer.GetRequiredShardingConfigOption(_shardingDbContextType); _shardingConfigOption = ShardingContainer.GetRequiredShardingConfigOption(_shardingDbContextType);
_currentQueryReadConnection = //原生对象的原生查询如果是读写分离就需要启用并行查询
_isParallelQuery =
_shardingConfigOption.UseReadWrite && _shardingDbContext.CurrentIsReadWriteSeparation(); _shardingConfigOption.UseReadWrite && _shardingDbContext.CurrentIsReadWriteSeparation();
} }
@ -74,9 +75,9 @@ namespace ShardingCore.Sharding.ShardingExecutors
return _shardingDbContextType; return _shardingDbContextType;
} }
public bool CurrentQueryReadConnection() public bool IsParallelQuery()
{ {
return _currentQueryReadConnection; return _isParallelQuery;
} }
public bool IsQueryTrack() public bool IsQueryTrack()
@ -105,7 +106,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
var virtualDataSource = (IVirtualDataSource)ShardingContainer.GetService( var virtualDataSource = (IVirtualDataSource)ShardingContainer.GetService(
typeof(IVirtualDataSource<>).GetGenericType0(_shardingDbContextType)); typeof(IVirtualDataSource<>).GetGenericType0(_shardingDbContextType));
var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>(); var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
var dbContext = _shardingDbContext.GetDbContext(virtualDataSource.DefaultDataSourceName, CurrentQueryReadConnection(), routeTailFactory.Create(string.Empty)); var dbContext = _shardingDbContext.GetDbContext(virtualDataSource.DefaultDataSourceName, IsParallelQuery(), routeTailFactory.Create(string.Empty));
_queryCompilerExecutor = new QueryCompilerExecutor(dbContext, _queryExpression); _queryCompilerExecutor = new QueryCompilerExecutor(dbContext, _queryExpression);
} }
} }

View File

@ -244,8 +244,6 @@ namespace ShardingCore.Sharding
{ {
if (!IsParallelQuery()) if (!IsParallelQuery())
return false; return false;
if (MergeQueryCompilerContext.CurrentQueryReadConnection())
return false;
return QueryTrack() && _trackerManager.EntityUseTrack(entityType); return QueryTrack() && _trackerManager.EntityUseTrack(entityType);
} }
private bool QueryTrack() private bool QueryTrack()

View File

@ -415,6 +415,19 @@ namespace ShardingCore.Test
var list1 = await queryable.ToListAsync(); var list1 = await queryable.ToListAsync();
Assert.Equal(24, list1.Count()); Assert.Equal(24, list1.Count());
Assert.DoesNotContain(list1, o => o.Name != "name_300"); Assert.DoesNotContain(list1, o => o.Name != "name_300");
var queryable1 = (from u in _virtualDbContext.Set<SysUserMod>().Where(o => o.Id == "300")
join salary in _virtualDbContext.Set<SysUserSalary>().Where(o=>o.DateOfMonth==202005)
on u.Id equals salary.UserId
select new
{
Salary = salary.Salary,
DateOfMonth = salary.DateOfMonth,
Name = u.Name
});
var list3 = await queryable1.ToListAsync();
Assert.Equal(1, list3.Count());
Assert.Contains(list3, o => o.Name == "name_300");
} }
[Fact] [Fact]

View File

@ -351,6 +351,19 @@ namespace ShardingCore.Test
var list1 = queryable.ToList(); var list1 = queryable.ToList();
Assert.Equal(24, list1.Count()); Assert.Equal(24, list1.Count());
Assert.DoesNotContain(list1, o => o.Name != "name_300"); Assert.DoesNotContain(list1, o => o.Name != "name_300");
var queryable1 = (from u in _virtualDbContext.Set<SysUserMod>().Where(o => o.Id == "300")
join salary in _virtualDbContext.Set<SysUserSalary>().Where(o => o.DateOfMonth == 202005)
on u.Id equals salary.UserId
select new
{
Salary = salary.Salary,
DateOfMonth = salary.DateOfMonth,
Name = u.Name
});
var list3 = queryable1.ToList();
Assert.Equal(1, list3.Count());
Assert.Contains(list3, o => o.Name == "name_300");
} }
[Fact] [Fact]

View File

@ -387,6 +387,19 @@ namespace ShardingCore.Test2x
var list1 = await queryable.ToListAsync(); var list1 = await queryable.ToListAsync();
Assert.Equal(24, list1.Count()); Assert.Equal(24, list1.Count());
Assert.DoesNotContain(list1, o => o.Name != "name_300"); Assert.DoesNotContain(list1, o => o.Name != "name_300");
var queryable1 = (from u in _virtualDbContext.Set<SysUserMod>().Where(o => o.Id == "300")
join salary in _virtualDbContext.Set<SysUserSalary>().Where(o => o.DateOfMonth == 202005)
on u.Id equals salary.UserId
select new
{
Salary = salary.Salary,
DateOfMonth = salary.DateOfMonth,
Name = u.Name
});
var list3 = await queryable1.ToListAsync();
Assert.Equal(1, list3.Count());
Assert.Contains(list3, o => o.Name == "name_300");
} }
[Fact] [Fact]

View File

@ -348,6 +348,19 @@ namespace ShardingCore.Test2x
var list1 = queryable.ToList(); var list1 = queryable.ToList();
Assert.Equal(24, list1.Count()); Assert.Equal(24, list1.Count());
Assert.DoesNotContain(list1, o => o.Name != "name_300"); Assert.DoesNotContain(list1, o => o.Name != "name_300");
var queryable1 = (from u in _virtualDbContext.Set<SysUserMod>().Where(o => o.Id == "300")
join salary in _virtualDbContext.Set<SysUserSalary>().Where(o => o.DateOfMonth == 202005)
on u.Id equals salary.UserId
select new
{
Salary = salary.Salary,
DateOfMonth = salary.DateOfMonth,
Name = u.Name
});
var list3 = queryable1.ToList();
Assert.Equal(1, list3.Count());
Assert.Contains(list3, o => o.Name == "name_300");
} }
[Fact] [Fact]

View File

@ -386,6 +386,19 @@ namespace ShardingCore.Test3x
var list1 = await queryable.ToListAsync(); var list1 = await queryable.ToListAsync();
Assert.Equal(24, list1.Count()); Assert.Equal(24, list1.Count());
Assert.DoesNotContain(list1, o => o.Name != "name_300"); Assert.DoesNotContain(list1, o => o.Name != "name_300");
var queryable1 = (from u in _virtualDbContext.Set<SysUserMod>().Where(o => o.Id == "300")
join salary in _virtualDbContext.Set<SysUserSalary>().Where(o => o.DateOfMonth == 202005)
on u.Id equals salary.UserId
select new
{
Salary = salary.Salary,
DateOfMonth = salary.DateOfMonth,
Name = u.Name
});
var list3 = await queryable1.ToListAsync();
Assert.Equal(1, list3.Count());
Assert.Contains(list3, o => o.Name == "name_300");
} }
[Fact] [Fact]

View File

@ -350,6 +350,19 @@ namespace ShardingCore.Test3x
var list1 = queryable.ToList(); var list1 = queryable.ToList();
Assert.Equal(24, list1.Count()); Assert.Equal(24, list1.Count());
Assert.DoesNotContain(list1, o => o.Name != "name_300"); Assert.DoesNotContain(list1, o => o.Name != "name_300");
var queryable1 = (from u in _virtualDbContext.Set<SysUserMod>().Where(o => o.Id == "300")
join salary in _virtualDbContext.Set<SysUserSalary>().Where(o => o.DateOfMonth == 202005)
on u.Id equals salary.UserId
select new
{
Salary = salary.Salary,
DateOfMonth = salary.DateOfMonth,
Name = u.Name
});
var list3 = queryable1.ToList();
Assert.Equal(1, list3.Count());
Assert.Contains(list3, o => o.Name == "name_300");
} }
[Fact] [Fact]

View File

@ -386,6 +386,19 @@ namespace ShardingCore.Test5x
var list1 = await queryable.ToListAsync(); var list1 = await queryable.ToListAsync();
Assert.Equal(24, list1.Count()); Assert.Equal(24, list1.Count());
Assert.DoesNotContain(list1, o => o.Name != "name_300"); Assert.DoesNotContain(list1, o => o.Name != "name_300");
var queryable1 = (from u in _virtualDbContext.Set<SysUserMod>().Where(o => o.Id == "300")
join salary in _virtualDbContext.Set<SysUserSalary>().Where(o => o.DateOfMonth == 202005)
on u.Id equals salary.UserId
select new
{
Salary = salary.Salary,
DateOfMonth = salary.DateOfMonth,
Name = u.Name
});
var list3 = await queryable1.ToListAsync();
Assert.Equal(1, list3.Count());
Assert.Contains(list3, o => o.Name == "name_300");
} }
[Fact] [Fact]

View File

@ -350,6 +350,20 @@ namespace ShardingCore.Test5x
var list1 = queryable.ToList(); var list1 = queryable.ToList();
Assert.Equal(24, list1.Count()); Assert.Equal(24, list1.Count());
Assert.DoesNotContain(list1, o => o.Name != "name_300"); Assert.DoesNotContain(list1, o => o.Name != "name_300");
var queryable1 = (from u in _virtualDbContext.Set<SysUserMod>().Where(o => o.Id == "300")
join salary in _virtualDbContext.Set<SysUserSalary>().Where(o => o.DateOfMonth == 202005)
on u.Id equals salary.UserId
select new
{
Salary = salary.Salary,
DateOfMonth = salary.DateOfMonth,
Name = u.Name
});
var list3 = queryable1.ToList();
Assert.Equal(1, list3.Count());
Assert.Contains(list3, o => o.Name == "name_300");
} }
[Fact] [Fact]