修复很多bug

This commit is contained in:
xuejiaming 2022-06-27 17:35:35 +08:00
parent 723dd0e7f4
commit b38bf2b236
38 changed files with 196 additions and 171 deletions

View File

@ -26,10 +26,4 @@ namespace ShardingCore.Core.DbContextCreator
/// <returns></returns>
public DbContext CreateDbContext(DbContext shellDbContext, ShardingDbContextOptions shardingDbContextOptions);
}
public interface IDbContextCreator<TShardingDbContext> : IDbContextCreator
where TShardingDbContext : DbContext, IShardingDbContext
{
}
}

View File

@ -12,8 +12,7 @@ namespace ShardingCore.Core.EntityMetadatas
/// <summary>
/// 默认分片对象元数据管理者实现
/// </summary>
/// <typeparam name="TShardingDbContext"></typeparam>
public class DefaultEntityMetadataManager<TShardingDbContext> : IEntityMetadataManager<TShardingDbContext> where TShardingDbContext : DbContext, IShardingDbContext
public class DefaultEntityMetadataManager : IEntityMetadataManager
{
private readonly ConcurrentDictionary<Type, EntityMetadata> _caches =new ();
public bool AddEntityMetadata(EntityMetadata entityMetadata)

View File

@ -1,17 +1,33 @@
using System;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.DbContextCreator;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Core.UnionAllMergeShardingProviders.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding.ParallelTables;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
namespace ShardingCore.Core
{
public interface IShardingRuntimeContext
{
IShardingReadWriteManager GetShardingReadWriteManager();
ITrackerManager GetTrackerManager();
IParallelTableManager GetParallelTableManager();
IDbContextCreator GetDbContextCreator();
IEntityMetadataManager GetEntityMetadataManager();
IVirtualDataSourceManager GetVirtualDataSourceManager();
IVirtualTableManager GetVirtualTableManager();
IRouteTailFactory GetRouteTailFactory();
IQueryTracker GetQueryTracker();
IUnionAllMergeManager GetUnionAllMergeManager();
IShardingPageManager GetShardingPageManager();
IShardingRuntimeModel GetShardingRuntimeModel();
IShardingRuntimeModel GetOrCreateShardingRuntimeModel(DbContext dbContext);
object GetService(Type serviceType);

View File

@ -19,12 +19,11 @@ using ShardingCore.Sharding.ShardingComparision.Abstractions;
namespace ShardingCore.Core.ShardingConfigurations.ConfigBuilders
{
public class ShardingConfigBuilder<TShardingDbContext>
where TShardingDbContext : DbContext, IShardingDbContext
public class ShardingConfigBuilder
{
public ShardingCoreConfigBuilder<TShardingDbContext> ShardingCoreConfigBuilder { get; }
public ShardingCoreConfigBuilder ShardingCoreConfigBuilder { get; }
public ShardingConfigBuilder(ShardingCoreConfigBuilder<TShardingDbContext> shardingCoreConfigBuilder)
public ShardingConfigBuilder(ShardingCoreConfigBuilder shardingCoreConfigBuilder)
{
ShardingCoreConfigBuilder = shardingCoreConfigBuilder;
}
@ -37,19 +36,19 @@ namespace ShardingCore.Core.ShardingConfigurations.ConfigBuilders
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="ShardingCoreConfigException"></exception>
/// <exception cref="ArgumentException"></exception>
public ShardingConfigBuilder<TShardingDbContext> AddConfig(Action<ShardingConfigOptions<TShardingDbContext>> shardingGlobalConfigOptionsConfigure)
public ShardingConfigBuilder AddConfig(Action<ShardingConfigOptions> shardingGlobalConfigOptionsConfigure)
{
var shardingGlobalConfigOptions = new ShardingConfigOptions<TShardingDbContext>();
var shardingGlobalConfigOptions = new ShardingConfigOptions();
shardingGlobalConfigOptionsConfigure?.Invoke(shardingGlobalConfigOptions);
if (string.IsNullOrWhiteSpace(shardingGlobalConfigOptions.ConfigId))
throw new ArgumentNullException(nameof(shardingGlobalConfigOptions.ConfigId));
if (string.IsNullOrWhiteSpace(shardingGlobalConfigOptions.DefaultDataSourceName))
throw new ArgumentNullException(
$"{nameof(shardingGlobalConfigOptions.DefaultDataSourceName)} plz call {nameof(ShardingConfigOptions<TShardingDbContext>.AddDefaultDataSource)}");
$"{nameof(shardingGlobalConfigOptions.DefaultDataSourceName)} plz call {nameof(ShardingConfigOptions.AddDefaultDataSource)}");
if (string.IsNullOrWhiteSpace(shardingGlobalConfigOptions.DefaultConnectionString))
throw new ArgumentNullException(
$"{nameof(shardingGlobalConfigOptions.DefaultConnectionString)} plz call {nameof(ShardingConfigOptions<TShardingDbContext>.AddDefaultDataSource)}");
$"{nameof(shardingGlobalConfigOptions.DefaultConnectionString)} plz call {nameof(ShardingConfigOptions.AddDefaultDataSource)}");
if (shardingGlobalConfigOptions.ConnectionStringConfigure is null&& ShardingCoreConfigBuilder.ShardingEntityConfigOptions.ConnectionStringConfigure is null)
throw new ArgumentNullException($"plz call {nameof(shardingGlobalConfigOptions.UseShardingQuery)}");

View File

@ -20,27 +20,26 @@ namespace ShardingCore.DIExtensions
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingCoreConfigBuilder<TShardingDbContext>
where TShardingDbContext : DbContext, IShardingDbContext
public class ShardingCoreConfigBuilder
{
public IServiceCollection Services { get; }
public List<ShardingConfigOptions<TShardingDbContext>> ShardingConfigOptions { get; }
public ShardingEntityConfigOptions<TShardingDbContext> ShardingEntityConfigOptions { get; }
public List<ShardingConfigOptions> ShardingConfigOptions { get; }
public ShardingEntityConfigOptions ShardingEntityConfigOptions { get; }
public ShardingCoreConfigBuilder(IServiceCollection services)
{
Services = services;
ShardingConfigOptions = new List<ShardingConfigOptions<TShardingDbContext>>();
ShardingEntityConfigOptions = new ShardingEntityConfigOptions<TShardingDbContext>();
ShardingConfigOptions = new List<ShardingConfigOptions>();
ShardingEntityConfigOptions = new ShardingEntityConfigOptions();
}
public ShardingConfigBuilder<TShardingDbContext> AddEntityConfig(Action<ShardingEntityConfigOptions<TShardingDbContext>> entityConfigure)
public ShardingConfigBuilder AddEntityConfig(Action<ShardingEntityConfigOptions> entityConfigure)
{
entityConfigure?.Invoke(ShardingEntityConfigOptions);
return new ShardingConfigBuilder<TShardingDbContext>(this);
return new ShardingConfigBuilder(this);
}
//public ShardingCoreConfigBuilder<TShardingDbContext, TActualDbContext> AddDefaultDataSource(string dataSourceName, string connectionString)
//{

View File

@ -15,7 +15,7 @@ using ShardingCore.Sharding.ParallelTables;
namespace ShardingCore.Core.ShardingConfigurations
{
public class ShardingEntityConfigOptions<TShardingDbContext> : IShardingEntityConfigOptions<TShardingDbContext> where TShardingDbContext : DbContext, IShardingDbContext
public class ShardingEntityConfigOptions : IShardingEntityConfigOptions
{
private readonly IDictionary<Type, Type> _virtualDataSourceRoutes = new Dictionary<Type, Type>();
private readonly IDictionary<Type, Type> _virtualTableRoutes = new Dictionary<Type, Type>();

View File

@ -15,7 +15,7 @@ namespace ShardingCore.Core.TrackerManagers
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class TrackerManager<TShardingDbContext>: ITrackerManager<TShardingDbContext> where TShardingDbContext : DbContext, IShardingDbContext
public class TrackerManager: ITrackerManager
{
private readonly ConcurrentDictionary<Type,bool> _dbContextModels = new ();

View File

@ -18,6 +18,7 @@ using ShardingCore.Extensions;
using ShardingCore.Sharding;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ReadWriteConfigurations;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using ShardingCore.Utils;
namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
@ -44,7 +45,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
public string DefaultConnectionString { get; private set; }
public bool UseReadWriteSeparation { get; }
public VirtualDataSource(IEntityMetadataManager entityMetadataManager, IVirtualDataSourceRouteManager dataSourceRouteManager, IVirtualDataSourceConfigurationParams configurationParams)
public VirtualDataSource(IEntityMetadataManager entityMetadataManager, IVirtualDataSourceRouteManager dataSourceRouteManager, IVirtualDataSourceConfigurationParams configurationParams,IReadWriteConnectorFactory readWriteConnectorFactory)
{
Check.NotNull(configurationParams, nameof(configurationParams));
Check.NotNull(configurationParams.ExtraDataSources, nameof(configurationParams.ExtraDataSources));
@ -65,7 +66,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
if (UseReadWriteSeparation)
{
CheckReadWriteSeparation();
ConnectionStringManager = new ReadWriteConnectionStringManager(this);
ConnectionStringManager = new ReadWriteConnectionStringManager(this,readWriteConnectorFactory);
}
else
{

View File

@ -13,6 +13,7 @@ using ShardingCore.Core.VirtualDatabase.VirtualDataSources.Common;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
{
@ -22,12 +23,13 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
private readonly IEntityMetadataManager _entityMetadataManager;
private readonly IVirtualDataSourceRouteManager _virtualDataSourceRouteManager;
private readonly IVirtualDataSourceAccessor _virtualDataSourceAccessor;
private readonly IReadWriteConnectorFactory _readWriteConnectorFactory;
private readonly ConcurrentDictionary<string, IVirtualDataSource> _virtualDataSources = new();
private string _defaultConfigId;
private IVirtualDataSource _defaultVirtualDataSource;
public VirtualDataSourceManager(IServiceProvider serviceProvider, IShardingConfigurationOptions options, IEntityMetadataManager entityMetadataManager, IVirtualDataSourceRouteManager virtualDataSourceRouteManager, IVirtualDataSourceAccessor virtualDataSourceAccessor)
public VirtualDataSourceManager(IServiceProvider serviceProvider, IShardingConfigurationOptions options, IEntityMetadataManager entityMetadataManager, IVirtualDataSourceRouteManager virtualDataSourceRouteManager, IVirtualDataSourceAccessor virtualDataSourceAccessor,IReadWriteConnectorFactory readWriteConnectorFactory)
{
_options = options;
@ -37,6 +39,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
if (allShardingGlobalConfigOptions.IsEmpty())
throw new ArgumentException($"sharding virtual data source is empty");
_virtualDataSourceAccessor = virtualDataSourceAccessor;
_readWriteConnectorFactory = readWriteConnectorFactory;
if (options is ShardingMultiConfigurationOptions shardingMultiConfigurationOptions)
{
IsMultiShardingConfiguration = true;
@ -119,7 +122,7 @@ namespace ShardingCore.Core.VirtualDatabase.VirtualDataSources
{
if (!IsMultiShardingConfiguration&&_virtualDataSources.IsNotEmpty())
throw new NotSupportedException("not support multi sharding configuration");
var dataSource = new VirtualDataSource(_entityMetadataManager, _virtualDataSourceRouteManager, configurationParams);
var dataSource = new VirtualDataSource(_entityMetadataManager, _virtualDataSourceRouteManager, configurationParams,_readWriteConnectorFactory);
dataSource.CheckVirtualDataSource();
return _virtualDataSources.TryAdd(dataSource.ConfigId, dataSource);
}

View File

@ -91,10 +91,12 @@ namespace ShardingCore.Extensions
{
if (entities.IsEmpty())
return new Dictionary<string, Dictionary<DbContext, IEnumerable<TEntity>>>();
var shardingRuntimeContext = shardingDbContext.GetRequireService<IShardingRuntimeContext>();
var entityType = typeof(TEntity);
var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
var routeTailFactory = shardingRuntimeContext.GetRouteTailFactory();
var virtualDataSource = shardingDbContext.GetVirtualDataSource();
var entityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContext.GetType()));
var virtualTableManager =shardingRuntimeContext.GetVirtualTableManager();
var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();
var dataSourceNames = new Dictionary<string, Dictionary<string, BulkDicEntry<TEntity>>>();
var entitiesArray = entities as TEntity[] ?? entities.ToArray();
var isShardingDataSource = entityMetadataManager.IsShardingDataSource(entityType);
@ -118,7 +120,6 @@ namespace ShardingCore.Extensions
var bulkDicEntries = new Dictionary<string, BulkDicEntry<TEntity>>();
dataSourceNames.Add(virtualDataSource.DefaultDataSourceName, bulkDicEntries);
var virtualTableManager = (IVirtualTableManager)ShardingContainer.GetService(typeof(IVirtualTableManager<>).GetGenericType0(shardingDbContext.GetType()));
var virtualTable = virtualTableManager.GetVirtualTable(entityType);
var virtualTableRoute = virtualTable.GetVirtualRoute();
var allTails = virtualTable.GetTableAllTails().ToHashSet();
@ -139,7 +140,6 @@ namespace ShardingCore.Extensions
ISet<string> allTails = null;
if (isShardingTable)
{
var virtualTableManager = (IVirtualTableManager)ShardingContainer.GetService(typeof(IVirtualTableManager<>).GetGenericType0(shardingDbContext.GetType()));
virtualTable = virtualTableManager.GetVirtualTable(entityType);
virtualTableRoute = virtualTable.GetVirtualRoute();
allTails = virtualTable.GetTableAllTails().ToHashSet();
@ -229,7 +229,8 @@ namespace ShardingCore.Extensions
IEnumerable<TEntity> entities) where TShardingDbContext : DbContext, IShardingDbContext
where TEntity : class
{
var entityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContext.GetType()));
var shardingRuntimeContext = shardingDbContext.GetRequireService<IShardingRuntimeContext>();
var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();
if (entityMetadataManager.IsShardingDataSource(typeof(TEntity)))
throw new ShardingCoreInvalidOperationException(typeof(TEntity).FullName);
//if (!entityMetadataManager.IsShardingTable(typeof(TEntity)))
@ -249,10 +250,11 @@ namespace ShardingCore.Extensions
public static IDictionary<string, IEnumerable<DbContext>> BulkShardingExpression<TShardingDbContext, TEntity>(this TShardingDbContext shardingDbContext, Expression<Func<TEntity, bool>> where) where TEntity : class
where TShardingDbContext : DbContext, IShardingDbContext
{
var shardingRuntimeContext = shardingDbContext.GetRequireService<IShardingRuntimeContext>();
var virtualDataSource = shardingDbContext.GetVirtualDataSource();
var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
var virtualTableManager = (IVirtualTableManager)ShardingContainer.GetService(typeof(IVirtualTableManager<>).GetGenericType0(shardingDbContext.GetType()));
var entityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContext.GetType()));
var routeTailFactory = shardingRuntimeContext.GetRouteTailFactory();
var virtualTableManager = shardingRuntimeContext.GetVirtualTableManager();// (IVirtualTableManager)ShardingContainer.GetService(typeof(IVirtualTableManager<>).GetGenericType0(shardingDbContext.GetType()));
var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();// (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContext.GetType()));
var dataSourceNames = virtualDataSource.GetDataSourceNames(where);
var result = new Dictionary<string, LinkedList<DbContext>>();
@ -291,8 +293,8 @@ namespace ShardingCore.Extensions
public static IEnumerable<DbContext> BulkShardingTableExpression<TShardingDbContext, TEntity>(this TShardingDbContext shardingDbContext, Expression<Func<TEntity, bool>> where) where TEntity : class
where TShardingDbContext : DbContext, IShardingDbContext
{
var entityMetadataManager = (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContext.GetType()));
var shardingRuntimeContext = shardingDbContext.GetRequireService<IShardingRuntimeContext>();
var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();// (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContext.GetType()));
if (entityMetadataManager.IsShardingDataSource(typeof(TEntity)))
throw new ShardingCoreInvalidOperationException(typeof(TEntity).FullName);
return shardingDbContext.BulkShardingExpression<TShardingDbContext, TEntity>(where).First().Value;

View File

@ -2,6 +2,8 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ReadWriteConfigurations;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
@ -55,8 +57,8 @@ namespace ShardingCore.Extensions
/// <param name="readOnly">是否是读数据源</param>
private static void SetReadWriteSeparation(this ISupportShardingReadWrite supportShardingReadWrite, bool readOnly)
{
var shardingReadWriteManager = ShardingContainer.GetService<IShardingReadWriteManager>();
var shardingRuntimeContext = ((DbContext)supportShardingReadWrite).GetRequireService<IShardingRuntimeContext>();
var shardingReadWriteManager =shardingRuntimeContext.GetService<IShardingReadWriteManager>();
var shardingReadWriteContext = shardingReadWriteManager.GetCurrent(supportShardingReadWrite.GetType());
if (shardingReadWriteContext != null)
{
@ -84,7 +86,8 @@ namespace ShardingCore.Extensions
var shardingDbContextType = shardingDbContext.GetType();
if (shardingDbContext.IsUseReadWriteSeparation())
{
var shardingReadWriteManager = ShardingContainer.GetService<IShardingReadWriteManager>();
var shardingRuntimeContext = ((DbContext)shardingDbContext).GetRequireService<IShardingRuntimeContext>();
var shardingReadWriteManager =shardingRuntimeContext.GetService<IShardingReadWriteManager>();
var shardingReadWriteContext = shardingReadWriteManager.GetCurrent(shardingDbContextType);
if (shardingReadWriteContext != null)
{

View File

@ -10,6 +10,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Migrations.Operations;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Core;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
@ -31,6 +32,7 @@ namespace ShardingCore.Helpers
{
private MigrationHelper() { }
public static void Generate<TShardingDContext>(
IShardingRuntimeContext shardingRuntimeContext,
MigrationOperation operation,
MigrationCommandListBuilder builder,
ISqlGenerationHelper sqlGenerationHelper,
@ -40,7 +42,7 @@ namespace ShardingCore.Helpers
var migrationCommands = (List<MigrationCommand>) builder.GetFieldValue("_commands");
addCmds.ForEach(aAddCmd =>
{
var shardingCmds = BuildShardingCmds<TShardingDContext>(operation, aAddCmd.CommandText, sqlGenerationHelper);
var shardingCmds = BuildShardingCmds<TShardingDContext>(shardingRuntimeContext,operation, aAddCmd.CommandText, sqlGenerationHelper);
if (shardingCmds.IsNotEmpty())
{
migrationCommands.Remove(aAddCmd);
@ -54,15 +56,16 @@ namespace ShardingCore.Helpers
});
}
private static List<string> BuildShardingCmds<TShardingDContext>(MigrationOperation operation, string sourceCmd, ISqlGenerationHelper sqlGenerationHelper)
private static List<string> BuildShardingCmds<TShardingDContext>(IShardingRuntimeContext shardingRuntimeContext,MigrationOperation operation, string sourceCmd, ISqlGenerationHelper sqlGenerationHelper)
where TShardingDContext : DbContext, IShardingDbContext
{
//所有MigrationOperation定义
//https://github.com/dotnet/efcore/tree/b970bf29a46521f40862a01db9e276e6448d3cb0/src/EFCore.Relational/Migrations/Operations
//ColumnOperation仅替换Table
//其余其余都是将Name和Table使用分表名替换
var virtualTableManager = ShardingContainer.GetService<IVirtualTableManager<TShardingDContext>>();
var virtualTableManager = shardingRuntimeContext.GetVirtualTableManager();
var allVirtualTables = virtualTableManager.GetAllVirtualTables();
var shardingRuntimeModel = shardingRuntimeContext.GetShardingRuntimeModel();
var existsShardingTables = allVirtualTables.ToDictionary(o => o.EntityMetadata.VirtualTableName, o => o.GetAllPhysicTables().Select(p=>p.FullName).ToList());
//Dictionary<string, List<string>> _existsShardingTables
// = Cache.ServiceProvider.GetService<ShardingContainer>().ExistsShardingTables;

View File

@ -17,20 +17,20 @@ namespace ShardingCore.Sharding
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ActualConnectionStringManager<TShardingDbContext> where TShardingDbContext : DbContext, IShardingDbContext
public class ActualConnectionStringManager
{
private readonly bool _useReadWriteSeparation;
private readonly IShardingReadWriteManager _shardingReadWriteManager;
private readonly IVirtualDataSource<TShardingDbContext> _virtualDataSource;
private readonly IVirtualDataSource _virtualDataSource;
public int ReadWriteSeparationPriority { get; set; }
public bool ReadWriteSeparation { get; set; }
public ReadStrategyEnum ReadStrategy { get; set; }
public ReadConnStringGetStrategyEnum ReadConnStringGetStrategy { get; set; }
private string _cacheConnectionString;
public ActualConnectionStringManager(IVirtualDataSource<TShardingDbContext> virtualDataSource)
public ActualConnectionStringManager(IShardingReadWriteManager shardingReadWriteManager,IVirtualDataSource virtualDataSource)
{
_shardingReadWriteManager = shardingReadWriteManager;
_virtualDataSource=virtualDataSource;
_shardingReadWriteManager = ShardingContainer.GetService<IShardingReadWriteManager>();
_useReadWriteSeparation = virtualDataSource.ConnectionStringManager is ReadWriteConnectionStringManager;
if (_useReadWriteSeparation)
{
@ -67,7 +67,7 @@ namespace ShardingCore.Sharding
var support = ReadWriteSeparation;
string readNodeName = null;
var hasConfig = false;
var shardingReadWriteContext = _shardingReadWriteManager.GetCurrent<TShardingDbContext>();
var shardingReadWriteContext = _shardingReadWriteManager.GetCurrent();
if (shardingReadWriteContext != null)
{
var dbFirst = ReadWriteSeparationPriority >= shardingReadWriteContext.DefaultPriority;

View File

@ -1,6 +1,8 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
@ -24,9 +26,10 @@ namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators
public AsyncTrackerEnumerator(IShardingDbContext shardingDbContext, IAsyncEnumerator<T> asyncEnumerator)
{
var shardingRuntimeContext = ((DbContext)shardingDbContext).GetRequireService<IShardingRuntimeContext>();
_shardingDbContext = shardingDbContext;
_asyncEnumerator = asyncEnumerator;
_queryTrack = ShardingContainer.GetService<IQueryTracker>();
_queryTrack = shardingRuntimeContext.GetQueryTracker();
}
public ValueTask DisposeAsync()
{

View File

@ -1,5 +1,7 @@
using System.Collections;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.QueryTrackers;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
@ -21,9 +23,10 @@ namespace ShardingCore.Sharding.Enumerators.TrackerEnumerators
public TrackerEnumerator(IShardingDbContext shardingDbContext,IEnumerator<T> enumerator)
{
var shardingRuntimeContext = ((DbContext)shardingDbContext).GetRequireService<IShardingRuntimeContext>();
_shardingDbContext = shardingDbContext;
_enumerator = enumerator;
_queryTrack = ShardingContainer.GetService<IQueryTracker>();
_queryTrack = shardingRuntimeContext.GetQueryTracker();
}
public bool MoveNext()
{

View File

@ -4,8 +4,10 @@ using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.Internal.Visitors.Selects;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualTables;
using ShardingCore.Sharding.EntityQueryConfigurations;
using ShardingCore.Sharding.ShardingExecutors;
@ -15,6 +17,12 @@ namespace ShardingCore.Sharding.MergeContexts
{
public sealed class QueryableOptimizeEngine: IQueryableOptimizeEngine
{
private readonly IVirtualTableManager _virtualTableManager;
public QueryableOptimizeEngine(IVirtualTableManager virtualTableManager)
{
_virtualTableManager = virtualTableManager;
}
public IOptimizeResult Optimize(IMergeQueryCompilerContext mergeQueryCompilerContext, IParseResult parseResult,
IQueryable rewriteQueryable)
{
@ -27,8 +35,7 @@ namespace ShardingCore.Sharding.MergeContexts
if (mergeQueryCompilerContext.IsSingleShardingEntityQuery() && mergeQueryCompilerContext.IsCrossTable() && !mergeQueryCompilerContext.UseUnionAllMerge())
{
var singleShardingEntityType = mergeQueryCompilerContext.GetSingleShardingEntityType();
var virtualTableManager = ShardingContainer.GetVirtualTableManager(mergeQueryCompilerContext.GetShardingDbContextType());
var virtualTable = virtualTableManager.GetVirtualTable(singleShardingEntityType);
var virtualTable = _virtualTableManager.GetVirtualTable(singleShardingEntityType);
if (virtualTable.EnableEntityQuery)
{
if (virtualTable.EntityQueryMetadata.DefaultTailComparer != null)

View File

@ -8,6 +8,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.UnionAllMergeShardingProviders.Abstractions;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
@ -49,7 +50,7 @@ namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
if (UseUnionAllMerge())
{
var customerDatabaseSqlSupportManager = ShardingContainer.GetService<IUnionAllMergeManager>();
var customerDatabaseSqlSupportManager = GetStreamMergeContext().ShardingRuntimeContext.GetUnionAllMergeManager();
using (customerDatabaseSqlSupportManager.CreateScope(
((UnSupportSqlRouteUnit)dataSourceSqlExecutorUnit.SqlExecutorGroups[0].Groups[0]
.RouteUnit).TableRouteResults))

View File

@ -19,7 +19,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
private readonly IShardingPageManager _shardingPageManager;
public CountAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
_shardingPageManager =streamMergeContext.ShardingRuntimeContext.GetShardingPageManager();
}
protected override int DoMergeResult(List<RouteQueryResult<int>> resultList)

View File

@ -33,14 +33,14 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
{
private readonly StreamMergeContext _streamMergeContext;
private readonly IShardingPageManager _shardingPageManager;
private readonly IVirtualTableManager<TShardingDbContext> _virtualTableManager;
private readonly IEntityMetadataManager<TShardingDbContext> _entityMetadataManager;
private readonly IVirtualTableManager _virtualTableManager;
private readonly IEntityMetadataManager _entityMetadataManager;
private EnumeratorStreamMergeEngineFactory(StreamMergeContext streamMergeContext)
{
_streamMergeContext = streamMergeContext;
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager<TShardingDbContext>>();
_entityMetadataManager = ShardingContainer.GetService<IEntityMetadataManager<TShardingDbContext>>();
_shardingPageManager = streamMergeContext.ShardingRuntimeContext.GetShardingPageManager();
_virtualTableManager =streamMergeContext.ShardingRuntimeContext.GetVirtualTableManager();
_entityMetadataManager = streamMergeContext.ShardingRuntimeContext.GetEntityMetadataManager();
}
public static EnumeratorStreamMergeEngineFactory<TShardingDbContext, TEntity> Create(StreamMergeContext streamMergeContext)

View File

@ -24,7 +24,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
private readonly IShardingPageManager _shardingPageManager;
public LongCountAsyncInMemoryMergeEngine(StreamMergeContext streamMergeContext) : base(streamMergeContext)
{
_shardingPageManager= ShardingContainer.GetService<IShardingPageManager>();
_shardingPageManager=streamMergeContext.ShardingRuntimeContext.GetShardingPageManager();
}
protected override long DoMergeResult(List<RouteQueryResult<long>> resultList)

View File

@ -1,16 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ParallelTables
{
public interface IParallelTableManager<TShardingDbContext> : IParallelTableManager
where TShardingDbContext : DbContext, IShardingDbContext
{
}
}

View File

@ -9,8 +9,7 @@ using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.ParallelTables
{
public sealed class ParallelTableManager<TShardingDbContext> : IParallelTableManager<TShardingDbContext>
where TShardingDbContext : DbContext, IShardingDbContext
public sealed class ParallelTableManager : IParallelTableManager
{
private readonly ISet<ParallelTableGroupNode> _parallelTableConfigs = new HashSet<ParallelTableGroupNode>();
public bool AddParallelTable(ParallelTableGroupNode parallelTableGroupNode)

View File

@ -8,6 +8,7 @@ using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Query;
using ShardingCore.Core;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Extensions;
using ShardingCore.Extensions.ShardingQueryableExtensions;
@ -41,7 +42,8 @@ namespace ShardingCore.Sharding.Parsers.Visitors
public ShardingQueryPrepareVisitor(IShardingDbContext shardingDbContext)
{
_shardingDbContext = shardingDbContext;
_trackerManager = ShardingContainer.GetTrackerManager(shardingDbContext.GetType());
_trackerManager = ((DbContext)shardingDbContext).GetRequireService<IShardingRuntimeContext>()
.GetTrackerManager();
}
public ShardingPrepareResult GetShardingPrepareResult()
{

View File

@ -15,12 +15,9 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations.Abstractions
*/
public interface IShardingReadWriteManager
{
ShardingReadWriteContext GetCurrent<TShardingDbContext>()
where TShardingDbContext : DbContext, IShardingDbContext;
ShardingReadWriteContext GetCurrent(Type shardingDbContextType);
ShardingReadWriteContext GetCurrent();
ShardingReadWriteScope<TShardingDbContext> CreateScope<TShardingDbContext>()
where TShardingDbContext : DbContext, IShardingDbContext;
ShardingReadWriteScope CreateScope();
}
}

View File

@ -22,12 +22,11 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
private readonly IVirtualDataSource _virtualDataSource;
public ReadWriteConnectionStringManager(IVirtualDataSource virtualDataSource)
public ReadWriteConnectionStringManager(IVirtualDataSource virtualDataSource,IReadWriteConnectorFactory readWriteConnectorFactory)
{
_virtualDataSource = virtualDataSource;
var readWriteConnectorFactory = ShardingContainer.GetService<IReadWriteConnectorFactory>();
var readWriteConnectors = virtualDataSource.ConfigurationParams.ReadWriteNodeSeparationConfigs.Select(o=> readWriteConnectorFactory.CreateConnector(virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(), o.Key,o.Value));
_shardingConnectionStringResolver = new ReadWriteShardingConnectionStringResolver(readWriteConnectors, virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault());
_shardingConnectionStringResolver = new ReadWriteShardingConnectionStringResolver(readWriteConnectors, virtualDataSource.ConfigurationParams.ReadStrategy.GetValueOrDefault(),readWriteConnectorFactory);
}
public string GetConnectionString(string dataSourceName)
{

View File

@ -16,7 +16,7 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
private readonly IReadWriteConnectorFactory _readWriteConnectorFactory;
private readonly ReaderWriterLockSlim _readerWriterLock = new ReaderWriterLockSlim();
public ReadWriteShardingConnectionStringResolver(IEnumerable<IReadWriteConnector> connectors, ReadStrategyEnum readStrategy)
public ReadWriteShardingConnectionStringResolver(IEnumerable<IReadWriteConnector> connectors, ReadStrategyEnum readStrategy,IReadWriteConnectorFactory readWriteConnectorFactory)
{
_readStrategy = readStrategy;
var enumerator = connectors.GetEnumerator();
@ -27,7 +27,7 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
_connectors.TryAdd(currentConnector.DataSourceName, currentConnector);
}
_readWriteConnectorFactory = ShardingContainer.GetService<IReadWriteConnectorFactory>();
_readWriteConnectorFactory = readWriteConnectorFactory;
}
public bool ContainsReadWriteDataSourceName(string dataSourceName)

View File

@ -20,32 +20,22 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
*/
public class ShardingReadWriteManager:IShardingReadWriteManager
{
private readonly ConcurrentDictionary<Type, IShardingReadWriteAccessor> _shardingReadWriteAccessors;
private readonly IShardingReadWriteAccessor _shardingReadWriteAccessor;
public ShardingReadWriteManager(IEnumerable<IShardingReadWriteAccessor> shardingReadWriteAccessors)
public ShardingReadWriteManager(IShardingReadWriteAccessor shardingReadWriteAccessor)
{
_shardingReadWriteAccessors = new ConcurrentDictionary<Type, IShardingReadWriteAccessor>(shardingReadWriteAccessors.ToDictionary(o => o.ShardingDbContextType, o => o));
}
public ShardingReadWriteContext GetCurrent<TShardingDbContext>() where TShardingDbContext : DbContext, IShardingDbContext
{
return GetCurrent(typeof(TShardingDbContext));
_shardingReadWriteAccessor = shardingReadWriteAccessor;
}
public ShardingReadWriteContext GetCurrent(Type shardingDbContextType)
public ShardingReadWriteContext GetCurrent()
{
if (!shardingDbContextType.IsShardingDbContext())
throw new ShardingCoreInvalidOperationException(shardingDbContextType.FullName);
if (_shardingReadWriteAccessors.TryGetValue(shardingDbContextType, out var accessor))
return accessor.ShardingReadWriteContext;
throw new ShardingCoreInvalidOperationException(shardingDbContextType.FullName);
return _shardingReadWriteAccessor.ShardingReadWriteContext;
}
public ShardingReadWriteScope<TShardingDbContext> CreateScope<TShardingDbContext>() where TShardingDbContext : DbContext, IShardingDbContext
public ShardingReadWriteScope CreateScope()
{
var shardingPageScope = new ShardingReadWriteScope<TShardingDbContext>(_shardingReadWriteAccessors.Select(o => o.Value));
var shardingPageScope = new ShardingReadWriteScope(_shardingReadWriteAccessor);
shardingPageScope.ShardingReadWriteAccessor.ShardingReadWriteContext = ShardingReadWriteContext.Create();
return shardingPageScope;
}

View File

@ -15,8 +15,7 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingReadWriteScope<TShardingDbContext>:IDisposable
where TShardingDbContext : DbContext, IShardingDbContext
public class ShardingReadWriteScope:IDisposable
{
public IShardingReadWriteAccessor ShardingReadWriteAccessor { get; }
@ -24,10 +23,10 @@ namespace ShardingCore.Sharding.ReadWriteConfigurations
/// <summary>
/// 构造函数
/// </summary>
/// <param name="shardingReadWriteAccessors"></param>
public ShardingReadWriteScope(IEnumerable<IShardingReadWriteAccessor> shardingReadWriteAccessors)
/// <param name="shardingReadWriteAccessor"></param>
public ShardingReadWriteScope(IShardingReadWriteAccessor shardingReadWriteAccessor)
{
ShardingReadWriteAccessor = shardingReadWriteAccessors.FirstOrDefault(o=>o.ShardingDbContextType==typeof(TShardingDbContext))??throw new ArgumentNullException(nameof(shardingReadWriteAccessors));
ShardingReadWriteAccessor = shardingReadWriteAccessor;
}
/// <summary>

View File

@ -29,14 +29,14 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class DataSourceDbContext<TShardingDbContext> : IDataSourceDbContext
where TShardingDbContext : DbContext, IShardingDbContext
public class DataSourceDbContext : IDataSourceDbContext
{
private static readonly ILogger<DataSourceDbContext<TShardingDbContext>> _logger =
InternalLoggerFactory.CreateLogger<DataSourceDbContext<TShardingDbContext>>();
private static readonly ILogger<DataSourceDbContext> _logger =
InternalLoggerFactory.CreateLogger<DataSourceDbContext>();
private static readonly IComparer<string> _comparer = new NoShardingFirstComparer();
public Type DbContextType { get; }
/// <summary>
/// 当前是否是默认的dbcontext 也就是不分片的dbcontext
/// </summary>
@ -50,17 +50,17 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
/// <summary>
/// dbcontext 创建接口
/// </summary>
private readonly IDbContextCreator<TShardingDbContext> _dbContextCreator;
private readonly IDbContextCreator _dbContextCreator;
/// <summary>
/// 实际的链接字符串管理者 用来提供查询和插入dbcontext的创建链接的获取
/// </summary>
private readonly ActualConnectionStringManager<TShardingDbContext> _actualConnectionStringManager;
private readonly ActualConnectionStringManager _actualConnectionStringManager;
/// <summary>
/// 当前的数据源是什么默认单数据源可以支持多数据源配置
/// </summary>
private readonly IVirtualDataSource<TShardingDbContext> _virtualDataSource;
private readonly IVirtualDataSource _virtualDataSource;
/// <summary>
/// 数据源名称
@ -92,7 +92,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
/// <summary>
/// 同库下公用一个db context options
/// </summary>
private DbContextOptions<TShardingDbContext> _dbContextOptions;
private DbContextOptions _dbContextOptions;
/// <summary>
/// 是否触发了并发如果是的话就报错
@ -115,14 +115,14 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
public DataSourceDbContext(string dataSourceName,
bool isDefault,
DbContext shardingShellDbContext,
IDbContextCreator<TShardingDbContext> dbContextCreator,
ActualConnectionStringManager<TShardingDbContext> actualConnectionStringManager)
IDbContextCreator dbContextCreator,
ActualConnectionStringManager actualConnectionStringManager)
{
DataSourceName = dataSourceName;
IsDefault = isDefault;
_shardingShellDbContext = shardingShellDbContext;
_virtualDataSource =
(IVirtualDataSource<TShardingDbContext>)((IShardingDbContext)shardingShellDbContext)
DbContextType = shardingShellDbContext.GetType();
_virtualDataSource =((IShardingDbContext)shardingShellDbContext)
.GetVirtualDataSource();
_dbContextCreator = dbContextCreator;
_actualConnectionStringManager = actualConnectionStringManager;
@ -132,7 +132,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
/// 创建共享的数据源配置用来做事务 不支持并发后期发现直接报错
/// </summary>
/// <returns></returns>
private DbContextOptions<TShardingDbContext> CreateShareDbContextOptionsBuilder()
private DbContextOptions CreateShareDbContextOptionsBuilder()
{
if (_dbContextOptions != null)
{
@ -149,7 +149,7 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
try
{
//先创建dbcontext option builder
var dbContextOptionsBuilder = CreateDbContextOptionBuilder();
var dbContextOptionsBuilder = CreateDbContextOptionBuilder(DbContextType);
if (IsDefault)
{
@ -186,11 +186,11 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
}
}
public static DbContextOptionsBuilder<TShardingDbContext> CreateDbContextOptionBuilder()
public static DbContextOptionsBuilder CreateDbContextOptionBuilder(Type dbContextType)
{
Type type = typeof(DbContextOptionsBuilder<>);
type = type.MakeGenericType(typeof(TShardingDbContext));
return (DbContextOptionsBuilder<TShardingDbContext>)Activator.CreateInstance(type);
type = type.MakeGenericType(dbContextType);
return (DbContextOptionsBuilder)Activator.CreateInstance(type);
}
/// <summary>

View File

@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Core;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
@ -29,18 +30,18 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
/// DbContext执行者
/// </summary>
/// <typeparam name="TShardingDbContext"></typeparam>
public class ShardingDbContextExecutor<TShardingDbContext> : IShardingDbContextExecutor where TShardingDbContext : DbContext, IShardingDbContext
public class ShardingDbContextExecutor : IShardingDbContextExecutor
{
private readonly DbContext _shardingDbContext;
//private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, DbContext>> _dbContextCaches = new ConcurrentDictionary<string, ConcurrentDictionary<string, DbContext>>();
private readonly ConcurrentDictionary<string, IDataSourceDbContext> _dbContextCaches = new ConcurrentDictionary<string, IDataSourceDbContext>();
private readonly IVirtualDataSource<TShardingDbContext> _virtualDataSource;
private readonly IVirtualTableManager<TShardingDbContext> _virtualTableManager;
private readonly IDbContextCreator<TShardingDbContext> _dbContextCreator;
private readonly IVirtualDataSource _virtualDataSource;
private readonly IVirtualTableManager _virtualTableManager;
private readonly IDbContextCreator _dbContextCreator;
private readonly IRouteTailFactory _routeTailFactory;
private readonly ActualConnectionStringManager<TShardingDbContext> _actualConnectionStringManager;
private readonly IEntityMetadataManager<TShardingDbContext> _entityMetadataManager;
private readonly ActualConnectionStringManager _actualConnectionStringManager;
private readonly IEntityMetadataManager _entityMetadataManager;
public int ReadWriteSeparationPriority
{
@ -59,19 +60,22 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
public ShardingDbContextExecutor(DbContext shardingDbContext)
{
_shardingDbContext = shardingDbContext;
_virtualDataSource = ShardingContainer.GetRequiredCurrentVirtualDataSource<TShardingDbContext>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager<TShardingDbContext>>();
_dbContextCreator = ShardingContainer.GetService<IDbContextCreator<TShardingDbContext>>();
_entityMetadataManager = ShardingContainer.GetService<IEntityMetadataManager<TShardingDbContext>>();
_routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
_actualConnectionStringManager = new ActualConnectionStringManager<TShardingDbContext>(_virtualDataSource);
var shardingRuntimeContext = shardingDbContext.GetRequireService<IShardingRuntimeContext>();
var virtualDataSourceManager = shardingRuntimeContext.GetVirtualDataSourceManager();
_virtualDataSource = virtualDataSourceManager.GetCurrentVirtualDataSource();
_virtualTableManager = shardingRuntimeContext.GetVirtualTableManager();
_dbContextCreator = shardingRuntimeContext.GetDbContextCreator();
_entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();
_routeTailFactory = shardingRuntimeContext.GetRouteTailFactory();
var shardingReadWriteManager = shardingRuntimeContext.GetShardingReadWriteManager();
_actualConnectionStringManager = new ActualConnectionStringManager(shardingReadWriteManager,_virtualDataSource);
}
#region create db context
private IDataSourceDbContext GetDataSourceDbContext(string dataSourceName)
{
return _dbContextCaches.GetOrAdd(dataSourceName, dsname => new DataSourceDbContext<TShardingDbContext>(dsname, _virtualDataSource.IsDefault(dsname), _shardingDbContext, _dbContextCreator, _actualConnectionStringManager));
return _dbContextCaches.GetOrAdd(dataSourceName, dsname => new DataSourceDbContext(dsname, _virtualDataSource.IsDefault(dsname), _shardingDbContext, _dbContextCreator, _actualConnectionStringManager));
}
/// <summary>
@ -97,9 +101,9 @@ namespace ShardingCore.Sharding.ShardingDbContextExecutors
}
}
private DbContextOptions<TShardingDbContext> CreateParallelDbContextOptions(string dataSourceName)
private DbContextOptions CreateParallelDbContextOptions(string dataSourceName)
{
var dbContextOptionBuilder = DataSourceDbContext<TShardingDbContext>.CreateDbContextOptionBuilder();
var dbContextOptionBuilder = DataSourceDbContext.CreateDbContextOptionBuilder(_shardingDbContext.GetType());
var connectionString = _actualConnectionStringManager.GetConnectionString(dataSourceName, false);
_virtualDataSource.UseDbContextOptionsBuilder(connectionString, dbContextOptionBuilder);
return dbContextOptionBuilder.Options;

View File

@ -19,7 +19,7 @@ namespace ShardingCore.ShardingExecutors
{
private readonly ShardingRouteScope _shardingRouteScope;
private readonly bool _hasCustomerQuery;
public CustomerQueryScope(IPrepareParseResult prepareParseResult)
public CustomerQueryScope(IPrepareParseResult prepareParseResult,IShardingRouteManager shardingRouteManager)
{
_hasCustomerQuery = prepareParseResult.HasCustomerQuery();
if (_hasCustomerQuery)
@ -27,7 +27,6 @@ namespace ShardingCore.ShardingExecutors
var asRoute = prepareParseResult.GetAsRoute();
if ( asRoute!= null)
{
var shardingRouteManager = ShardingContainer.GetService<IShardingRouteManager>();
_shardingRouteScope = shardingRouteManager.CreateScope();
asRoute.Invoke(shardingRouteManager.Current);
}

View File

@ -4,6 +4,7 @@ using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using ShardingCore.Core.QueryRouteManagers.Abstractions;
using ShardingCore.Extensions;
using ShardingCore.Logger;
using ShardingCore.Sharding.Parsers.Abstractions;
@ -22,20 +23,23 @@ namespace ShardingCore.Sharding.ShardingExecutors
private readonly IShardingTrackQueryExecutor _shardingTrackQueryExecutor;
private readonly IQueryCompilerContextFactory _queryCompilerContextFactory;
private readonly IPrepareParser _prepareParser;
private readonly IShardingRouteManager _shardingRouteManager;
public DefaultShardingCompilerExecutor(
IShardingTrackQueryExecutor shardingTrackQueryExecutor, IQueryCompilerContextFactory queryCompilerContextFactory,IPrepareParser prepareParser)
IShardingTrackQueryExecutor shardingTrackQueryExecutor, IQueryCompilerContextFactory queryCompilerContextFactory,IPrepareParser prepareParser,
IShardingRouteManager shardingRouteManager)
{
_shardingTrackQueryExecutor = shardingTrackQueryExecutor;
_queryCompilerContextFactory = queryCompilerContextFactory;
_prepareParser = prepareParser;
_shardingRouteManager = shardingRouteManager;
}
public TResult Execute<TResult>(IShardingDbContext shardingDbContext, Expression query)
{
//预解析表达式
var prepareParseResult = _prepareParser.Parse(shardingDbContext,query);
_logger.LogDebug($"compile parameter:{prepareParseResult}");
using (new CustomerQueryScope(prepareParseResult))
using (new CustomerQueryScope(prepareParseResult,_shardingRouteManager))
{
var queryCompilerContext = _queryCompilerContextFactory.Create(prepareParseResult);
return _shardingTrackQueryExecutor.Execute<TResult>(queryCompilerContext);
@ -53,7 +57,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
var prepareParseResult = _prepareParser.Parse(shardingDbContext, query);
_logger.LogDebug($"compile parameter:{prepareParseResult}");
using (new CustomerQueryScope(prepareParseResult))
using (new CustomerQueryScope(prepareParseResult,_shardingRouteManager))
{
var queryCompilerContext = _queryCompilerContextFactory.Create(prepareParseResult);
return _shardingTrackQueryExecutor.ExecuteAsync<TResult>(queryCompilerContext);
@ -67,7 +71,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
//预解析表达式
var prepareParseResult = _prepareParser.Parse(shardingDbContext, query);
_logger.LogDebug($"compile parameter:{prepareParseResult}");
using (new CustomerQueryScope(prepareParseResult))
using (new CustomerQueryScope(prepareParseResult,_shardingRouteManager))
{
var queryCompilerContext = _queryCompilerContextFactory.Create(prepareParseResult);
return _shardingTrackQueryExecutor.ExecuteAsync<TResult>(queryCompilerContext);
@ -80,7 +84,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
//预解析表达式
var prepareParseResult = _prepareParser.Parse(shardingDbContext, query);
_logger.LogDebug($"compile parameter:{prepareParseResult}");
using (new CustomerQueryScope(prepareParseResult))
using (new CustomerQueryScope(prepareParseResult,_shardingRouteManager))
{
var queryCompilerContext = _queryCompilerContextFactory.Create(prepareParseResult);
return _shardingTrackQueryExecutor.ExecuteAsync<TResult>(queryCompilerContext, cancellationToken);

View File

@ -31,8 +31,13 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
public class DefaultShardingQueryExecutor : IShardingQueryExecutor
{
private static readonly ILogger<DefaultShardingQueryExecutor> _logger=InternalLoggerFactory.CreateLogger<DefaultShardingQueryExecutor>();
private readonly IStreamMergeContextFactory _streamMergeContextFactory;
public DefaultShardingQueryExecutor(IStreamMergeContextFactory streamMergeContextFactory)
{
_streamMergeContextFactory = streamMergeContextFactory;
}
public TResult Execute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext)
{
//如果根表达式为tolist toarray getenumerator等表示需要迭代
@ -109,8 +114,7 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
private StreamMergeContext GetStreamMergeContext(IMergeQueryCompilerContext mergeQueryCompilerContext)
{
var streamMergeContextFactory = (IStreamMergeContextFactory)ShardingContainer.GetService(typeof(IStreamMergeContextFactory<>).GetGenericType0(mergeQueryCompilerContext.GetShardingDbContextType()));
return streamMergeContextFactory.Create(mergeQueryCompilerContext);
return _streamMergeContextFactory.Create(mergeQueryCompilerContext);
}
private TResult EnumerableExecute<TResult>(IMergeQueryCompilerContext mergeQueryCompilerContext)

View File

@ -48,11 +48,13 @@ namespace ShardingCore.Sharding.ShardingExecutors
private readonly IShardingQueryExecutor _shardingQueryExecutor;
private readonly INativeTrackQueryExecutor _nativeTrackQueryExecutor;
private readonly ITrackerManager _trackerManager;
public DefaultShardingTrackQueryExecutor(IShardingQueryExecutor shardingQueryExecutor, INativeTrackQueryExecutor nativeTrackQueryExecutor)
public DefaultShardingTrackQueryExecutor(IShardingQueryExecutor shardingQueryExecutor, INativeTrackQueryExecutor nativeTrackQueryExecutor,ITrackerManager trackerManager)
{
_shardingQueryExecutor = shardingQueryExecutor;
_nativeTrackQueryExecutor = nativeTrackQueryExecutor;
_trackerManager = trackerManager;
}
public TResult Execute<TResult>(IQueryCompilerContext queryCompilerContext)
{
@ -81,10 +83,8 @@ namespace ShardingCore.Sharding.ShardingExecutors
{
var queryEntityType = queryCompilerContext.GetQueryableEntityType();
var trackerManager =
(ITrackerManager)ShardingContainer.GetService(
typeof(ITrackerManager<>).GetGenericType0(queryCompilerContext.GetShardingDbContextType()));
if (trackerManager.EntityUseTrack(queryEntityType))
if (_trackerManager.EntityUseTrack(queryEntityType))
{
if (queryCompilerContext.IsEnumerableQuery())
{

View File

@ -4,6 +4,7 @@ using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core;
using ShardingCore.Core.EntityMetadatas;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
@ -22,6 +23,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
{
private readonly IParallelTableManager _parallelTableManager;
private readonly IShardingRuntimeContext _shardingRuntimeContext;
private readonly IQueryCompilerContext _queryCompilerContext;
private readonly QueryCombineResult _queryCombineResult;
private readonly DataSourceRouteResult _dataSourceRouteResult;
@ -44,11 +46,12 @@ namespace ShardingCore.Sharding.ShardingExecutors
private QueryCompilerExecutor _queryCompilerExecutor;
private bool? hasQueryCompilerExecutor;
private MergeQueryCompilerContext(IQueryCompilerContext queryCompilerContext, QueryCombineResult queryCombineResult, DataSourceRouteResult dataSourceRouteResult, IEnumerable<TableRouteResult> tableRouteResults)
private MergeQueryCompilerContext(IShardingRuntimeContext shardingRuntimeContext,IQueryCompilerContext queryCompilerContext, QueryCombineResult queryCombineResult, DataSourceRouteResult dataSourceRouteResult, IEnumerable<TableRouteResult> tableRouteResults)
{
_shardingRuntimeContext = shardingRuntimeContext;
_queryCompilerContext = queryCompilerContext;
_queryCombineResult = queryCombineResult;
_parallelTableManager = (IParallelTableManager)ShardingContainer.GetService(typeof(IParallelTableManager<>).GetGenericType0(queryCompilerContext.GetShardingDbContextType()));
_parallelTableManager = _shardingRuntimeContext.GetParallelTableManager();
_dataSourceRouteResult = dataSourceRouteResult;
_tableRouteResults = GetTableRouteResults(tableRouteResults).ToArray();
_isCrossDataSource = dataSourceRouteResult.IntersectDataSources.Count > 1;
@ -73,7 +76,9 @@ namespace ShardingCore.Sharding.ShardingExecutors
public static MergeQueryCompilerContext Create(IQueryCompilerContext queryCompilerContext, QueryCombineResult queryCombineResult, DataSourceRouteResult dataSourceRouteResult,IEnumerable<TableRouteResult> tableRouteResults)
{
return new MergeQueryCompilerContext(queryCompilerContext, queryCombineResult,dataSourceRouteResult, tableRouteResults);
var shardingDbContext = queryCompilerContext.GetShardingDbContext();
var shardingRuntimeContext = ((DbContext)shardingDbContext).GetRequireService<IShardingRuntimeContext>();
return new MergeQueryCompilerContext(shardingRuntimeContext,queryCompilerContext, queryCombineResult,dataSourceRouteResult, tableRouteResults);
}
public Dictionary<Type,IQueryable> GetQueryEntities()
{
@ -154,7 +159,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
if (hasQueryCompilerExecutor.Value)
{
//要么本次查询不追踪如果需要追踪不可以存在跨tails
var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
var routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
var dbContext = GetShardingDbContext().GetDbContext(_dataSourceRouteResult.IntersectDataSources.First(), IsParallelQuery(), routeTailFactory.Create(_tableRouteResults[0]));
_queryCompilerExecutor = new QueryCompilerExecutor(dbContext, GetQueryExpression());
}

View File

@ -14,19 +14,20 @@ namespace ShardingCore.Sharding.ShardingExecutors.NativeTrackQueries
public class NativeTrackQueryExecutor : INativeTrackQueryExecutor
{
private readonly IQueryTracker _queryTracker;
public NativeTrackQueryExecutor(IQueryTracker queryTracker)
private readonly ITrackerManager _trackerManager;
public NativeTrackQueryExecutor(IQueryTracker queryTracker,ITrackerManager trackerManager)
{
_queryTracker = queryTracker;
_trackerManager = trackerManager;
}
public TResult Track<TResult>(IQueryCompilerContext queryCompilerContext, TResult resultTask)
{
if (resultTask != null)
{
var trackerManager =
(ITrackerManager)ShardingContainer.GetService(
typeof(ITrackerManager<>).GetGenericType0(queryCompilerContext.GetShardingDbContextType()));
if (trackerManager.EntityUseTrack(resultTask.GetType()))
if (_trackerManager.EntityUseTrack(resultTask.GetType()))
{
var trackedEntity = _queryTracker.Track(resultTask, queryCompilerContext.GetShardingDbContext());
if (trackedEntity != null)

View File

@ -19,6 +19,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
{
private readonly Dictionary<Type/* 查询对象类型 */, IQueryable/* 查询对象对应的表达式 */> _queryEntities;
private readonly IShardingDbContext _shardingDbContext;
private readonly IShardingRuntimeContext _shardingRuntimeContext;
private readonly Expression _queryExpression;
private readonly IEntityMetadataManager _entityMetadataManager;
private readonly Type _shardingDbContextType;
@ -34,6 +35,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
private QueryCompilerContext(IPrepareParseResult prepareParseResult)
{
_shardingRuntimeContext = ((DbContext)prepareParseResult.GetShardingDbContext()).GetRequireService<IShardingRuntimeContext>();
_shardingDbContext = prepareParseResult.GetShardingDbContext();
_queryExpression = prepareParseResult.GetNativeQueryExpression();
_shardingDbContextType = _shardingDbContext.GetType();
@ -43,7 +45,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
_useUnionAllMerge = prepareParseResult.UseUnionAllMerge();
_maxQueryConnectionsLimit = prepareParseResult.GetMaxQueryConnectionsLimit();
_connectionMode = prepareParseResult.GetConnectionMode();
_entityMetadataManager = ShardingContainer.GetRequiredEntityMetadataManager(_shardingDbContextType);
_entityMetadataManager = _shardingRuntimeContext.GetEntityMetadataManager();
//原生对象的原生查询如果是读写分离就需要启用并行查询
_isParallelQuery = prepareParseResult.ReadOnly().GetValueOrDefault();
@ -145,7 +147,7 @@ namespace ShardingCore.Sharding.ShardingExecutors
if (hasQueryCompilerExecutor.Value)
{
var virtualDataSource = _shardingDbContext.GetVirtualDataSource();
var routeTailFactory = ShardingContainer.GetService<IRouteTailFactory>();
var routeTailFactory = _shardingRuntimeContext.GetRouteTailFactory();
var dbContext = _shardingDbContext.GetDbContext(virtualDataSource.DefaultDataSourceName, IsParallelQuery(), routeTailFactory.Create(string.Empty));
_queryCompilerExecutor = new QueryCompilerExecutor(dbContext, _queryExpression);
}

View File

@ -36,6 +36,7 @@ namespace ShardingCore.Sharding
#endif
{
public IMergeQueryCompilerContext MergeQueryCompilerContext { get; }
public IShardingRuntimeContext ShardingRuntimeContext{ get; }
public IParseResult ParseResult { get; }
public IQueryable RewriteQueryable { get; }
public IOptimizeResult OptimizeResult { get; }
@ -85,6 +86,8 @@ namespace ShardingCore.Sharding
IRouteTailFactory routeTailFactory,ITrackerManager trackerManager,IShardingEntityConfigOptions shardingEntityConfigOptions)
{
MergeQueryCompilerContext = mergeQueryCompilerContext;
ShardingRuntimeContext = ((DbContext)mergeQueryCompilerContext.GetShardingDbContext())
.GetRequireService<IShardingRuntimeContext>();
ParseResult = parseResult;
RewriteQueryable = rewriteQueryable;
OptimizeResult = optimizeResult;