support firstordefaultasync

This commit is contained in:
xuejiaming 2021-08-17 22:17:18 +08:00
parent f3aa01e999
commit f4620a4bc4
31 changed files with 910 additions and 811 deletions

View File

@ -26,8 +26,13 @@ namespace Sample.SqlServer.Controllers
[HttpGet]
public async Task<IActionResult> Get()
{
var result = await _defaultTableDbContext.Set<SysUserMod>().OrderBy(o=>o.Age).ToListAsync();
var resultx = await _defaultTableDbContext.Set<SysUserMod>().Where(o => o.Id == "198").FirstOrDefaultAsync();
var result = await _defaultTableDbContext.Set<SysUserMod>().ToListAsync();
var sysUserMod98 = result.FirstOrDefault(o => o.Id == "98");
sysUserMod98.Name = "name_update"+new Random().Next(1,99)+"_98";
await _defaultTableDbContext.SaveChangesAsync();
return Ok(result);
}
}
}
}

View File

@ -20,5 +20,6 @@ namespace Sample.SqlServer.DbContexts
modelBuilder.ApplyConfiguration(new SysUserModMap());
modelBuilder.ApplyConfiguration(new SysTestMap());
}
}
}

View File

@ -5,17 +5,26 @@ using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Sample.SqlServer.DbContexts;
using Sample.SqlServer.Shardings;
using ShardingCore;
using ShardingCore.EFCores;
using ShardingCore.SqlServer;
namespace Sample.SqlServer
{
public class Startup
{
public static readonly ILoggerFactory efLogger = LoggerFactory.Create(builder =>
{
builder.AddFilter((category, level) => category == DbLoggerCategory.Database.Command.Name && level == LogLevel.Information).AddConsole();
});
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
@ -23,8 +32,8 @@ namespace Sample.SqlServer
services.AddControllers();
services.AddShardingSqlServer(o =>
{
o.EnsureCreatedWithOutShardingTable = true;
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = false;
o.CreateShardingTableOnStart = false;
o.UseShardingDbContext<DefaultTableDbContext>( dbConfig =>
{
dbConfig.AddShardingTableRoute<SysUserModVirtualTableRoute>();
@ -35,8 +44,17 @@ namespace Sample.SqlServer
services.AddDbContext<DefaultTableDbContext>(o => o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDB;Integrated Security=True"));
services.AddDbContext<DefaultShardingDbContext>(o =>
o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDB;Integrated Security=True;").UseSharding());
services.AddShardingDbContext<DefaultShardingDbContext, DefaultTableDbContext>(op =>
{
op.UseShardingDbContextOptions((connection, builder) =>
{
return builder.UseSqlServer(connection).UseLoggerFactory(efLogger)
.UseQueryTrackingBehavior(QueryTrackingBehavior.TrackAll)
.ReplaceService<IModelCacheKeyFactory, ShardingModelCacheKeyFactory>()
.ReplaceService<IModelCustomizer, ShardingModelCustomizer>().Options;
});
},o =>
o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDB;Integrated Security=True;MultipleActiveResultSets=True;").UseSharding());
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.

View File

@ -1,21 +1,14 @@
using System;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.EntityFrameworkCore.Query;
using Microsoft.EntityFrameworkCore.Update;
using Microsoft.EntityFrameworkCore.Query.Internal;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Core.VirtualRoutes;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Core.VirtualTables;
using ShardingCore.DbContexts;
using ShardingCore.DbContexts.VirtualDbContexts;
using ShardingCore.Extensions;
using ShardingCore.EFCores;
using ShardingCore.Sharding;
using ShardingCore.Sharding.Query;
using ShardingCore.SqlServer.EFCores;
using ShardingCore.TableCreator;
using System;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Query.Sql;
@ -62,7 +55,7 @@ namespace ShardingCore.SqlServer
public static DbContextOptionsBuilder UseSharding(this DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.ReplaceService<IDbSetSource, ShardingDbSetSource>().ReplaceService<IAsyncQueryProvider, ShardingEntityQueryProvider>();
optionsBuilder.ReplaceService<IDbSetSource, ShardingDbSetSource>().ReplaceService<IQueryCompiler, ShardingQueryCompiler>();
return optionsBuilder;
}
}

View File

@ -21,4 +21,10 @@
<ProjectReference Include="..\ShardingCore\ShardingCore.csproj" />
</ItemGroup>
<ItemGroup>
<Reference Include="Microsoft.Extensions.Logging.Console">
<HintPath>C:\Program Files\dotnet\packs\Microsoft.AspNetCore.App.Ref\5.0.0\ref\net5.0\Microsoft.Extensions.Logging.Console.dll</HintPath>
</Reference>
</ItemGroup>
</Project>

View File

@ -5,6 +5,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Query.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Console;
using ShardingCore.DbContexts.VirtualDbContexts;
using ShardingCore.DbContexts.VirtualDbContexts.ShareDbContextOptionsProviders;
using ShardingCore.EFCores;
@ -26,25 +27,25 @@ namespace ShardingCore.SqlServer
*/
public class SqlServerDbContextOptionsProvider : IDbContextOptionsProvider
{
private readonly ILoggerFactory _loggerFactory;
private readonly IShardingCoreOptions _shardingCoreOptions;
public SqlServerDbContextOptionsProvider(ILoggerFactory loggerFactory, IShardingCoreOptions shardingCoreOptions)
public static readonly ILoggerFactory efLogger = LoggerFactory.Create(builder =>
{
builder.AddFilter((category, level) => category == DbLoggerCategory.Database.Command.Name && level == LogLevel.Information).AddConsole();
});
public SqlServerDbContextOptionsProvider(IShardingCoreOptions shardingCoreOptions)
{
_loggerFactory = loggerFactory;
_shardingCoreOptions = shardingCoreOptions;
}
public DbContextOptions GetDbContextOptions(DbConnection dbConnection)
{
Console.WriteLine("create new dbcontext options,dbconnection is new:"+(dbConnection==null));
var track = dbConnection != null;
var connection = dbConnection ?? GetSqlConnection();
var dbContextOptions = CreateDbContextOptionBuilder()
.UseSqlServer(connection)
.UseLoggerFactory(_loggerFactory)
.UseLoggerFactory(efLogger)
.IfDo(!track, o => o.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking))
//.IfDo(isQuery,o=>o.ReplaceService<IQueryCompiler, ShardingQueryCompiler>())
.ReplaceService<IModelCacheKeyFactory, ShardingModelCacheKeyFactory>()

View File

@ -1,4 +1,5 @@
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core.Internal.StreamMerge;
using ShardingCore.Core.ShardingAccessors;
@ -7,20 +8,21 @@ using ShardingCore.Core.VirtualTables;
using ShardingCore.DbContexts;
using ShardingCore.DbContexts.Abstractions;
using ShardingCore.DbContexts.ShardingDbContexts;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.TableCreator;
namespace ShardingCore
{
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 13:32:18
* @Email: 326308290@qq.com
*/
/*
* @Author: xjm
* @Description:
* @Date: Thursday, 28 January 2021 13:32:18
* @Email: 326308290@qq.com
*/
public static class DIExtension
{
public static IServiceCollection AddShardingCore(this IServiceCollection services)
{
services.AddSingleton<IDbContextCreateFilterManager, DbContextCreateFilterManager>();
@ -39,5 +41,29 @@ namespace ShardingCore
services.AddSingleton<IShardingScopeFactory, ShardingScopeFactory>();
return services;
}
public static IServiceCollection AddShardingDbContext<TShardingDbContext, TActualDbContext>(this IServiceCollection services,
Action<ShardingConfig<TActualDbContext>> configure,
Action<DbContextOptionsBuilder> optionsAction = null,
ServiceLifetime contextLifetime = ServiceLifetime.Scoped,
ServiceLifetime optionsLifetime = ServiceLifetime.Scoped)
where TActualDbContext : DbContext, IShardingTableDbContext
where TShardingDbContext : DbContext
{
if (configure == null)
throw new ArgumentNullException($"AddScfSqlServerProvider 参数不能为空:{nameof(configure)}");
var shardingConfig = new ShardingConfig<TActualDbContext>();
configure?.Invoke(shardingConfig);
services.AddSingleton(shardingConfig);
services.AddDbContext<TShardingDbContext>(optionsAction, contextLifetime, optionsLifetime);
services.AddShardingCore();
services.AddSingleton<IShardingBootstrapper, ShardingBootstrapper>();
return services;
}
}
}

View File

@ -1,15 +1,26 @@
#if !EFCORE2
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Query;
using Microsoft.EntityFrameworkCore.Query.Internal;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.EFCores
{
@ -19,281 +30,132 @@ namespace ShardingCore.EFCores
* Authorxuejiaming
* Created: 2020/12/28 13:58:46
**/
public class ShardingQueryCompiler: QueryCompiler
{
private readonly IQueryContextFactory _queryContextFactory;
public class ShardingQueryCompiler: IQueryCompiler
{
private readonly IQueryContextFactory _queryContextFactory;
private readonly IDatabase _database;
private readonly IDiagnosticsLogger<DbLoggerCategory.Query> _logger;
private readonly IModel _model;
private readonly ICurrentDbContext _currentContext;
private readonly IModel _model;
private readonly IStreamMergeContextFactory _streamMergeContextFactory;
public ShardingQueryCompiler(IQueryContextFactory queryContextFactory, ICompiledQueryCache compiledQueryCache, ICompiledQueryCacheKeyGenerator compiledQueryCacheKeyGenerator, IDatabase database, IDiagnosticsLogger<DbLoggerCategory.Query> logger, ICurrentDbContext currentContext, IEvaluatableExpressionFilter evaluatableExpressionFilter, IModel model) : base(queryContextFactory, compiledQueryCache, compiledQueryCacheKeyGenerator, database, logger, currentContext, evaluatableExpressionFilter, model)
public ShardingQueryCompiler(IQueryContextFactory queryContextFactory, ICompiledQueryCache compiledQueryCache, ICompiledQueryCacheKeyGenerator compiledQueryCacheKeyGenerator, IDatabase database, IDiagnosticsLogger<DbLoggerCategory.Query> logger, ICurrentDbContext currentContext, IEvaluatableExpressionFilter evaluatableExpressionFilter, IModel model)
{
_queryContextFactory = queryContextFactory;
_database = database;
_logger = logger;
_model = model;
}
public override TResult Execute<TResult>(Expression query)
{
var shardingAccessor = ShardingContainer.Services.GetService<IShardingAccessor>();
if (shardingAccessor?.ShardingContext != null)
{
return ShardingExecute<TResult>(query);
}
return base.Execute<TResult>(query);
}
/// <summary>
/// use no compiler
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="query"></param>
/// <returns></returns>
private TResult ShardingExecute<TResult>(Expression query)
{
var queryContext = _queryContextFactory.Create();
query = ExtractParameters(query, queryContext, _logger);
var compiledQuery
= CompileQueryCore<TResult>(_database, query, _model, false);
return compiledQuery(queryContext);
}
public override TResult ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken = new CancellationToken())
{
var shardingAccessor = ShardingContainer.Services.GetService<IShardingAccessor>();
if (shardingAccessor?.ShardingContext != null)
{
var result= ShardingExecuteAsync<TResult>(query, cancellationToken);
return result;
}
return base.ExecuteAsync<TResult>(query, cancellationToken);
}
private TResult ShardingExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken = new CancellationToken())
{
var queryContext = _queryContextFactory.Create();
queryContext.CancellationToken = cancellationToken;
query = ExtractParameters(query, queryContext, _logger);
var compiledQuery
= CompileQueryCore<TResult>(_database, query, _model, true);
return compiledQuery(queryContext);
}
}
}
#endif
_currentContext = currentContext;
_model = model;
_streamMergeContextFactory = ShardingContainer.GetService<IStreamMergeContextFactory>();
}
#if EFCORE2
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore;
using ShardingCore.Core.ShardingAccessors;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.EntityFrameworkCore.Query;
using Microsoft.EntityFrameworkCore.Query.Internal;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Remotion.Linq.Clauses.StreamedData;
private ICurrentDbContext GetCurrentDbContext()
{
return _currentContext;
}
public TResult Execute<TResult>(Expression query)
{
throw new NotImplementedException();
}
namespace ShardingCore.EFCores
{
/**
*
*
* Authorxuejiaming
* Created: 2020/12/28 13:58:46
**/
public class ShardingQueryCompiler: QueryCompiler
{
private static MethodInfo CompileQueryMethod { get; }
= typeof(IDatabase).GetTypeInfo()
.GetDeclaredMethod(nameof(IDatabase.CompileQuery));
private readonly IQueryContextFactory _queryContextFactory;
private readonly ICompiledQueryCache _compiledQueryCache;
private readonly ICompiledQueryCacheKeyGenerator _compiledQueryCacheKeyGenerator;
private readonly IDatabase _database;
private readonly IDiagnosticsLogger<DbLoggerCategory.Query> _logger;
private readonly IQueryModelGenerator _queryModelGenerator;
private readonly Type _contextType;
public TResult ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
var currentDbContext = GetCurrentDbContext().Context;
public ShardingQueryCompiler(IQueryContextFactory queryContextFactory, ICompiledQueryCache compiledQueryCache, ICompiledQueryCacheKeyGenerator compiledQueryCacheKeyGenerator, IDatabase database, IDiagnosticsLogger<DbLoggerCategory.Query> logger, ICurrentDbContext currentContext, IQueryModelGenerator queryModelGenerator) : base(queryContextFactory, compiledQueryCache, compiledQueryCacheKeyGenerator, database, logger, currentContext, queryModelGenerator)
{
_queryContextFactory = queryContextFactory;
_compiledQueryCache = compiledQueryCache;
_compiledQueryCacheKeyGenerator = compiledQueryCacheKeyGenerator;
_database = database;
_logger = logger;
_queryModelGenerator = queryModelGenerator;
_contextType = currentContext.Context.GetType();
}
public override TResult Execute<TResult>(Expression query)
{
var shardingAccessor = ShardingContainer.Services.GetService<IShardingAccessor>();
if (shardingAccessor?.ShardingContext != null)
{
return ShardingExecute<TResult>(query);
}
return base.Execute<TResult>(query);
}
private TResult ShardingExecute<TResult>(Expression query)
{
var queryContext = _queryContextFactory.Create();
query = _queryModelGenerator.ExtractParameters(_logger, query, queryContext);
var compiledQuery
= CompileQueryCore<TResult>(query, _queryModelGenerator, _database, _logger, _contextType);
return compiledQuery(queryContext);
}
public override IAsyncEnumerable<TResult> ExecuteAsync<TResult>(Expression query)
{
var shardingAccessor = ShardingContainer.Services.GetService<IShardingAccessor>();
if (shardingAccessor?.ShardingContext != null)
{
return ShardingExecuteEnumerableAsync<TResult>(query);
}
return base.ExecuteAsync<TResult>(query);
}
private IAsyncEnumerable<TResult> ShardingExecuteEnumerableAsync<TResult>(Expression query)
{
var queryContext = _queryContextFactory.Create();
query = _queryModelGenerator.ExtractParameters(_logger, query, queryContext);
return CompileAsyncQueryCore<TResult>(query,_queryModelGenerator, _database)(queryContext);
}
public override Task<TResult> ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
var shardingAccessor = ShardingContainer.Services.GetService<IShardingAccessor>();
if (shardingAccessor?.ShardingContext != null)
if (currentDbContext is IShardingDbContext shardingDbContext)
{
return ShardingExecuteAsync<TResult>(query, cancellationToken);
if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>)))
{
var queryEntityType = typeof(TResult).GetGenericArguments()[0];
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, query);
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new[] { queryable,shardingDbContext });
Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
}
if (typeof(TResult).HasImplementedRawGeneric(typeof(Task<>)))
{
if (query is MethodCallExpression methodCallExpression)
{
var queryEntityType = query.Type;
Type type = typeof(IQueryable<>);
type = type.MakeGenericType(queryEntityType);
var rootQuery = methodCallExpression.Arguments.FirstOrDefault(o=>o.Type==type);
switch (methodCallExpression.Method.Name)
{
case nameof(Enumerable.FirstOrDefault): return FirstOrDefaultAsync<TResult>(shardingDbContext, queryEntityType, rootQuery, queryable =>
(TResult)(typeof(ShardingEntityFrameworkQueryableExtensions).GetMethod(nameof(ShardingEntityFrameworkQueryableExtensions.ShardingFirstOrDefaultAsync))
.MakeGenericMethod(new Type[]
{
queryEntityType
}).Invoke(null, new object[] { queryable, cancellationToken })), cancellationToken);
//, BindingFlags.Static | BindingFlags.Public);.InvokeMember(, System.Reflection.BindingFlags.InvokeMethod | System.Reflection.BindingFlags.Static
//| System.Reflection.BindingFlags.Public, null, null, new object[] { queryable, cancellationToken }), cancellationToken
}
}
return default;
}
throw new ShardingCoreException($"db context operator not support query expression:[{query}] result type:[{typeof(TResult).FullName}]");
//IQueryable<TResult> queryable = new EnumerableQuery<TResult>(expression);
//var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext);
//var streamMergeEngine = AsyncEnumerableStreamMergeEngine<TResult>.Create<TResult>(streamMergeContext);
//return streamMergeEngine.GetAsyncEnumerator();
}
return base.ExecuteAsync<TResult>(query, cancellationToken);
}
private Task<TResult> ShardingExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
var queryContext = _queryContextFactory.Create();
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
queryContext.CancellationToken = cancellationToken;
query = _queryModelGenerator.ExtractParameters(_logger, query, queryContext);
private TResult FirstOrDefaultAsync<TResult>(IShardingDbContext shardingDbContext,Type queryEntityType, Expression query,Func<IQueryable, TResult> efQuery, CancellationToken cancellationToken)
{
var compiledQuery = CompileAsyncQueryCore<TResult>(query,_queryModelGenerator, _database);
Type type = typeof(EnumerableQuery<>);
type = type.MakeGenericType(queryEntityType);
var queryable = Activator.CreateInstance(type, query);
return ExecuteSingletonAsyncQuery(queryContext, compiledQuery, _logger, _contextType);
}
private static Func<QueryContext, TResult> CompileQueryCore<TResult>(
Expression query,
IQueryModelGenerator queryModelGenerator,
IDatabase database,
IDiagnosticsLogger<DbLoggerCategory.Query> logger,
Type contextType)
{
var queryModel = queryModelGenerator.ParseQuery(query);
var streamMergeContextMethod = _streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(_streamMergeContextFactory, new object[] { queryable, shardingDbContext });
var resultItemType
= (queryModel.GetOutputDataInfo()
as StreamedSequenceInfo)?.ResultItemType
?? typeof(TResult);
Type streamMergeEngineType = typeof(FirstOrDefaultAsyncInMemoryAsyncStreamMergeEngine<>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
var streamEngineMethod = streamMergeEngineType.GetMethod("ExecuteAsync");
if (streamEngineMethod == null)
throw new ShardingCoreException("cant found InMemoryAsyncStreamMergeEngine method [ExecuteAsync]");
return (TResult)streamEngineMethod.Invoke(streamEngine, new object[] { efQuery, cancellationToken });
}
if (resultItemType == typeof(TResult))
{
var compiledQuery = database.CompileQuery<TResult>(queryModel);
public Func<QueryContext, TResult> CreateCompiledQuery<TResult>(Expression query)
{
throw new NotImplementedException();
}
return qc =>
{
try
{
return compiledQuery(qc).First();
}
catch (Exception exception)
{
logger.QueryIterationFailed(contextType, exception);
throw;
}
};
}
try
{
return (Func<QueryContext, TResult>)CompileQueryMethod
.MakeGenericMethod(resultItemType)
.Invoke(database, new object[] { queryModel });
}
catch (TargetInvocationException e)
{
ExceptionDispatchInfo.Capture(e.InnerException).Throw();
throw;
}
}
private static Func<QueryContext, IAsyncEnumerable<TResult>> CompileAsyncQueryCore<TResult>(
Expression query,
IQueryModelGenerator queryModelGenerator,
IDatabase database)
{
var queryModel = queryModelGenerator.ParseQuery(query);
return database.CompileAsyncQuery<TResult>(queryModel);
}
private static async Task<TResult> ExecuteSingletonAsyncQuery<TResult>(
QueryContext queryContext,
Func<QueryContext, IAsyncEnumerable<TResult>> compiledQuery,
IDiagnosticsLogger<DbLoggerCategory.Query> logger,
Type contextType)
{
try
{
var asyncEnumerable = compiledQuery(queryContext);
using (var asyncEnumerator = asyncEnumerable.GetEnumerator())
{
await asyncEnumerator.MoveNext(queryContext.CancellationToken);
return asyncEnumerator.Current;
}
}
catch (Exception exception)
{
logger.QueryIterationFailed(contextType, exception);
throw;
}
}
}
}
#endif
public Func<QueryContext, TResult> CreateCompiledAsyncQuery<TResult>(Expression query)
{
throw new NotImplementedException();
}
}
}

View File

@ -0,0 +1,65 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ShardingCore.Extensions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/17 9:43:00
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public static class GenericExtension
{
public static Type[] GetGenericArguments(this Type type, Type genericType)
{
return type.GetInterfaces() //取类型的接口
.Where(i => IsGenericType(i)) //筛选出相应泛型接口
.SelectMany(i => i.GetGenericArguments()) //选择所有接口的泛型参数
.ToArray(); //ToArray
bool IsGenericType(Type type1)
=> type1.IsGenericType && type1.GetGenericTypeDefinition() == genericType;
}
public static bool HasImplementedRawGeneric(this Type type, Type generic)
{
if (type == null) throw new ArgumentNullException(nameof(type));
if (generic == null) throw new ArgumentNullException(nameof(generic));
// 测试接口。
var isTheRawGenericType = type.GetInterfaces().Any(IsTheRawGenericType);
if (isTheRawGenericType) return true;
// 测试类型。
while (type != null && type != typeof(object))
{
isTheRawGenericType = IsTheRawGenericType(type);
if (isTheRawGenericType) return true;
type = type.BaseType;
}
// 没有找到任何匹配的接口或类型。
return false;
// 测试某个类型是否是指定的原始接口。
bool IsTheRawGenericType(Type test)
=> generic == (test.IsGenericType ? test.GetGenericTypeDefinition() : test);
}
}
}

View File

@ -40,7 +40,8 @@ namespace ShardingCore.Extensions
/// <returns></returns>
internal static IQueryable<T> RemoveTake<T>(this IQueryable<T> source)
{
return (IQueryable<T>) source.Provider.CreateQuery(new RemoveTakeVisitor().Visit(source.Expression));
var expression = new RemoveTakeVisitor().Visit(source.Expression);
return (IQueryable<T>) source.Provider.CreateQuery(expression);
}
/// <summary>

View File

@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Query;
namespace ShardingCore.Extensions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/17 15:51:32
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
internal static class ShardingEntityFrameworkQueryableExtensions
{
public static Task<TSource> ShardingFirstOrDefaultAsync<TSource>(
this IQueryable<TSource> source,
CancellationToken cancellationToken = default(CancellationToken))
{
return EntityFrameworkQueryableExtensions.FirstOrDefaultAsync(source, cancellationToken);
}
}
}

View File

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ShardingCore.Extensions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/17 21:42:37
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public static class ShardingQueryaleExtension
{
public static IQueryable<TElement> AsShardingQueryable<TElement>(
this IEnumerable<TElement> source)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
return source is IQueryable<TElement> queryable ? queryable : (IQueryable<TElement>)new EnumerableQuery<TElement>(source);
}
}
}

View File

@ -183,5 +183,13 @@ namespace ShardingCore.Helpers
var func = outer.Compile();
return func;
}
//public static TResult ShardingExecuteAsyncEnumerabe(Expression expression)
//{
//}
}
}

View File

@ -8,10 +8,13 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.ChangeTracking;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Core.VirtualTables;
using ShardingCore.DbContexts;
using ShardingCore.DbContexts.ShardingDbContexts;
using ShardingCore.EFCores;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
@ -27,28 +30,67 @@ namespace ShardingCore.Sharding
/// 分表分库的dbcontext
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class AbstractShardingDbContext<T> : DbContext, IShardingDbContext where T : DbContext
public abstract class AbstractShardingDbContext<T> : DbContext, IShardingDbContext where T : DbContext, IShardingTableDbContext
{
private readonly string EMPTY_SHARDING_TAIL_ID = Guid.NewGuid().ToString("n");
private readonly ConcurrentDictionary<string, DbContext> _dbContextCaches = new ConcurrentDictionary<string, DbContext>();
private readonly IVirtualTableManager _virtualTableManager;
private readonly IShardingDbContextFactory _shardingDbContextFactory;
private readonly ShardingConfig<T> _shardingConfig;
private DbContextOptions<T> _dbContextOptions;
private readonly object CREATELOCK = new object();
public AbstractShardingDbContext(DbContextOptions options) : base(options)
{
_shardingDbContextFactory = ShardingContainer.GetService<IShardingDbContextFactory>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager>();
_shardingConfig = ShardingContainer.GetService<ShardingConfig<T>>();
}
public Type ActualDbContextType => typeof(T);
private DbContextOptionsBuilder<T> CreateDbContextOptionBuilder()
{
Type type = typeof(DbContextOptionsBuilder<>);
type = type.MakeGenericType(ActualDbContextType);
return (DbContextOptionsBuilder<T>)Activator.CreateInstance(type);
}
private DbContextOptions<T> GetDbContextOptions()
{
var dbContextOptionBuilder = CreateDbContextOptionBuilder();
var dbConnection = Database.GetDbConnection();
dbConnection.Open();
return _shardingConfig.ShardingDbContextOptionsCreator(dbConnection, dbContextOptionBuilder);
}
private ShardingDbContextOptions CreateSameShardingDbContextOptions(string tail)
{
if (_dbContextOptions == null)
{
lock (CREATELOCK)
{
if (_dbContextOptions == null)
{
_dbContextOptions = GetDbContextOptions();
}
}
}
return new ShardingDbContextOptions(_dbContextOptions, tail);
}
public DbContext GetDbContext(bool track, string tail)
{
if (!_dbContextCaches.TryGetValue(tail, out var dbContext))
{
dbContext = _shardingDbContextFactory.Create(track ? this.Database.GetDbConnection() : null, tail == EMPTY_SHARDING_TAIL_ID ? string.Empty : tail); ;
dbContext = _shardingDbContextFactory.Create(CreateSameShardingDbContextOptions(tail == EMPTY_SHARDING_TAIL_ID ? string.Empty : tail));
_dbContextCaches.TryAdd(tail, dbContext);
}
//if (IsOpenTransaction)
//{
// _dbTransaction.Use(dbContext);
//}
return dbContext;
}
@ -338,7 +380,12 @@ namespace ShardingCore.Sharding
finally
{
if (!isBeginTransaction)
Database.CurrentTransaction.Dispose();
Database.CurrentTransaction?.Dispose();
foreach (var dbContextCache in _dbContextCaches)
{
dbContextCache.Value.Database.UseTransaction(null);
}
}
return i;
}
@ -368,7 +415,12 @@ namespace ShardingCore.Sharding
finally
{
if (!isBeginTransaction)
Database.CurrentTransaction.Dispose();
Database.CurrentTransaction?.Dispose();
foreach (var dbContextCache in _dbContextCaches)
{
dbContextCache.Value.Database.UseTransaction(null);
}
}
return i;
}
@ -390,15 +442,21 @@ namespace ShardingCore.Sharding
foreach (var dbContextCache in _dbContextCaches)
{
await dbContextCache.Value.Database.UseTransactionAsync(Database.CurrentTransaction.GetDbTransaction(), cancellationToken: cancellationToken);
i +=await dbContextCache.Value.SaveChangesAsync(cancellationToken);
i += await dbContextCache.Value.SaveChangesAsync(cancellationToken);
}
if (!isBeginTransaction)
await Database.CurrentTransaction.CommitAsync(cancellationToken);
}
finally
{
if (!isBeginTransaction)
if (!isBeginTransaction) { }
if (Database.CurrentTransaction != null)
await Database.CurrentTransaction.DisposeAsync();
foreach (var dbContextCache in _dbContextCaches)
{
await dbContextCache.Value.Database.UseTransactionAsync(null, cancellationToken: cancellationToken);
}
}
return i;
}
@ -420,7 +478,7 @@ namespace ShardingCore.Sharding
foreach (var dbContextCache in _dbContextCaches)
{
await dbContextCache.Value.Database.UseTransactionAsync(Database.CurrentTransaction.GetDbTransaction(), cancellationToken: cancellationToken);
i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess,cancellationToken);
i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
}
if (!isBeginTransaction)
await Database.CurrentTransaction.CommitAsync(cancellationToken);
@ -428,7 +486,13 @@ namespace ShardingCore.Sharding
finally
{
if (!isBeginTransaction)
await Database.CurrentTransaction.DisposeAsync();
if (Database.CurrentTransaction != null)
await Database.CurrentTransaction.DisposeAsync();
foreach (var dbContextCache in _dbContextCaches)
{
await dbContextCache.Value.Database.UseTransactionAsync(null, cancellationToken: cancellationToken);
}
}
return i;
}

View File

@ -13,6 +13,10 @@ namespace ShardingCore.Sharding.Abstractions
*/
public interface IShardingDbContext
{
/// <summary>
/// 真实的DbContext 类型
/// </summary>
Type ActualDbContextType { get;}
DbContext GetDbContext(bool track,string tail);
DbContext CreateGenericDbContext<T>(T entity) where T : class;

View File

@ -13,6 +13,6 @@ namespace ShardingCore.Sharding.Abstractions
public interface IStreamMergeEngine<T>
{
Task<IStreamMergeAsyncEnumerator<T>> GetAsyncEnumerator();
Task<IStreamMergeEnumerator<T>> GetEnumerator();
IStreamMergeEnumerator<T> GetEnumerator();
}
}

View File

@ -1,103 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.ShardingAccessors;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
namespace ShardingCore.Sharding
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 14 August 2021 22:07:28
* @Email: 326308290@qq.com
*/
public class GenericStreamMergeEngine<T> : IStreamMergeEngine<T>,IDisposable
{
private readonly StreamMergeContext<T> _mergeContext;
private readonly ICollection<DbContext> _parallelDbContexts;
public GenericStreamMergeEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
_parallelDbContexts = new LinkedList<DbContext>();
}
public static IStreamMergeEngine<T> Create<T>(StreamMergeContext<T> mergeContext)
{
return new GenericStreamMergeEngine<T>(mergeContext);
}
public async Task<IStreamMergeAsyncEnumerator<T>> GetAsyncEnumerator()
{
var tableResult = _mergeContext.GetRouteResults();
var enumeratorTasks = tableResult.Select(routeResult =>
{
if (routeResult.ReplaceTables.Count > 1)
throw new ShardingCoreException("route found more than 1 table name s");
var tail = string.Empty;
if (routeResult.ReplaceTables.Count == 1)
tail = routeResult.ReplaceTables.First().Tail;
return Task.Run(async () =>
{
try
{
using (var scope = _mergeContext.CreateScope())
{
var shardingContext = ShardingContext.Create(routeResult);
scope.ShardingAccessor.ShardingContext = shardingContext;
var shardingDbContext = _mergeContext.CreateDbContext(tail);
_parallelDbContexts.Add(shardingDbContext);
var newQueryable = (IQueryable<T>) _mergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var asyncEnumerator = await GetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<T>(asyncEnumerator);
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}).ToArray();
var streamEnumerators = await Task.WhenAll(enumeratorTasks);
if (_mergeContext.HasSkipTake())
return new PaginationStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
if (_mergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
}
public Task<IStreamMergeEnumerator<T>> GetEnumerator()
{
throw new NotImplementedException();
}
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable)
{
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
}
public void Dispose()
{
_parallelDbContexts.ForEach(o => o.Dispose());
}
}
}

View File

@ -1,98 +1,131 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Query;
using Microsoft.EntityFrameworkCore.Query.Internal;
using ShardingCore.Extensions;
//using System;
//using System.Collections.Generic;
//using System.Diagnostics.CodeAnalysis;
//using System.Linq;
//using System.Linq.Expressions;
//using System.Reflection;
//using System.Threading;
//using Microsoft.EntityFrameworkCore.Infrastructure;
//using Microsoft.EntityFrameworkCore.Query;
//using Microsoft.EntityFrameworkCore.Query.Internal;
//using ShardingCore.Exceptions;
//using ShardingCore.Extensions;
//using ShardingCore.Sharding.Abstractions;
//using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.Query
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 14 August 2021 14:25:27
* @Email: 326308290@qq.com
*/
public class ShardingEntityQueryProvider: IAsyncQueryProvider
{
private static readonly MethodInfo _genericCreateQueryMethod
= typeof(ShardingEntityQueryProvider).GetRuntimeMethods()
.Single(m => (m.Name == "CreateQuery") && m.IsGenericMethod);
//namespace ShardingCore.Sharding.Query
//{
///*
//* @Author: xjm
//* @Description:
//* @Date: Saturday, 14 August 2021 14:25:27
//* @Email: 326308290@qq.com
//*/
// public class ShardingEntityQueryProvider: IAsyncQueryProvider
// {
// private static readonly MethodInfo _genericCreateQueryMethod
// = typeof(ShardingEntityQueryProvider).GetRuntimeMethods()
// .Single(m => (m.Name == "CreateQuery") && m.IsGenericMethod);
private readonly MethodInfo _genericExecuteMethod;
// private readonly MethodInfo _genericExecuteMethod;
private readonly IQueryCompiler _queryCompiler;
private readonly ICurrentDbContext _currentDbContext;
// private readonly IQueryCompiler _queryCompiler;
// private readonly ICurrentDbContext _currentDbContext;
// private readonly IStreamMergeContextFactory _streamMergeContextFactory;
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public ShardingEntityQueryProvider(IQueryCompiler queryCompiler,ICurrentDbContext currentDbContext)
{
_queryCompiler = queryCompiler;
_currentDbContext = currentDbContext;
_genericExecuteMethod = queryCompiler.GetType()
.GetRuntimeMethods()
.Single(m => (m.Name == "Execute") && m.IsGenericMethod);
}
// /// <summary>
// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
// /// the same compatibility standards as public APIs. It may be changed or removed without notice in
// /// any release. You should only use it directly in your code with extreme caution and knowing that
// /// doing so can result in application failures when updating to a new Entity Framework Core release.
// /// </summary>
// public ShardingEntityQueryProvider(IQueryCompiler queryCompiler,ICurrentDbContext currentDbContext)
// {
// _queryCompiler = queryCompiler;
// _currentDbContext = currentDbContext;
// _genericExecuteMethod = queryCompiler.GetType()
// .GetRuntimeMethods()
// .Single(m => (m.Name == "Execute") && m.IsGenericMethod);
// _streamMergeContextFactory = ShardingContainer.GetService<IStreamMergeContextFactory>();
// }
public ICurrentDbContext GetCurrentDbContext()
{
return _currentDbContext;
}
// public ICurrentDbContext GetCurrentDbContext()
// {
// return _currentDbContext;
// }
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual IQueryable<TElement> CreateQuery<TElement>(Expression expression)
=> new ShardingEntityQueryable<TElement>(this, expression);
// /// <summary>
// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
// /// the same compatibility standards as public APIs. It may be changed or removed without notice in
// /// any release. You should only use it directly in your code with extreme caution and knowing that
// /// doing so can result in application failures when updating to a new Entity Framework Core release.
// /// </summary>
// public virtual IQueryable<TElement> CreateQuery<TElement>(Expression expression)
// => new ShardingEntityQueryable<TElement>(this, expression);
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual IQueryable CreateQuery(Expression expression)
=> (IQueryable)_genericCreateQueryMethod
.MakeGenericMethod(expression.Type.GetSequenceType())
.Invoke(this, new object[] { expression });
// /// <summary>
// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
// /// the same compatibility standards as public APIs. It may be changed or removed without notice in
// /// any release. You should only use it directly in your code with extreme caution and knowing that
// /// doing so can result in application failures when updating to a new Entity Framework Core release.
// /// </summary>
// public virtual IQueryable CreateQuery(Expression expression)
// => (IQueryable)_genericCreateQueryMethod
// .MakeGenericMethod(expression.Type.GetSequenceType())
// .Invoke(this, new object[] { expression });
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual TResult Execute<TResult>(Expression expression)
=> throw new NotSupportedException();
// /// <summary>
// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
// /// the same compatibility standards as public APIs. It may be changed or removed without notice in
// /// any release. You should only use it directly in your code with extreme caution and knowing that
// /// doing so can result in application failures when updating to a new Entity Framework Core release.
// /// </summary>
// public virtual TResult Execute<TResult>(Expression expression)
// => throw new NotSupportedException();
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual object Execute(Expression expression)
=> throw new NotSupportedException();
// /// <summary>
// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
// /// the same compatibility standards as public APIs. It may be changed or removed without notice in
// /// any release. You should only use it directly in your code with extreme caution and knowing that
// /// doing so can result in application failures when updating to a new Entity Framework Core release.
// /// </summary>
// public virtual object Execute(Expression expression)
// => throw new NotSupportedException();
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual TResult ExecuteAsync<TResult>(Expression expression, CancellationToken cancellationToken = default)
=> throw new NotSupportedException();
}
}
// /// <summary>
// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
// /// the same compatibility standards as public APIs. It may be changed or removed without notice in
// /// any release. You should only use it directly in your code with extreme caution and knowing that
// /// doing so can result in application failures when updating to a new Entity Framework Core release.
// /// </summary>
// public virtual TResult ExecuteAsync<TResult>(Expression expression, CancellationToken cancellationToken = default)
// {
// var currentDbContext = GetCurrentDbContext();
// if (currentDbContext is IShardingDbContext shardingDbContext)
// {
// if (typeof(TResult).HasImplementedRawGeneric(typeof(IAsyncEnumerable<>)))
// {
// var constructors
// = typeof(EnumerableQuery<>).GetTypeInfo().DeclaredConstructors
// .Where(c => !c.IsStatic && c.IsPublic)
// .ToArray();
// var parameters = constructors[0].GetParameters();
// var parameterType = parameters[0].ParameterType;
// Type type = typeof(EnumerableQuery<>);
// type = type.MakeGenericType(ActualDbContextType);
// Activator.CreateInstance(type);
// }
// IQueryable<TResult> queryable = new EnumerableQuery<TResult>(expression);
// var streamMergeContext = _streamMergeContextFactory.Create(queryable, shardingDbContext);
// var streamMergeEngine = AsyncEnumerableStreamMergeEngine<TResult>.Create<TResult>(streamMergeContext);
// return streamMergeEngine.GetAsyncEnumerator();
// }
// throw new ShardingCoreException("db context is not IShardingDbContext");
// }
// }
//}

View File

@ -16,19 +16,19 @@ using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.Query
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 14 August 2021 14:17:30
* @Email: 326308290@qq.com
*/
public class ShardingEntityQueryable<TResult>: IOrderedQueryable<TResult>,
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 14 August 2021 14:17:30
* @Email: 326308290@qq.com
*/
public class ShardingEntityQueryable<TResult> : IOrderedQueryable<TResult>,
IAsyncEnumerable<TResult>,
IListSource
{
private readonly IAsyncQueryProvider _queryProvider;
private readonly DbContext _dbContext;
//private readonly DbContext _dbContext;
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
@ -36,7 +36,7 @@ namespace ShardingCore.Sharding.Query
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public ShardingEntityQueryable(IAsyncQueryProvider queryProvider,IEntityType entityType)
public ShardingEntityQueryable(IAsyncQueryProvider queryProvider, IEntityType entityType)
: this(queryProvider, new QueryRootExpression(queryProvider, entityType))
{
}
@ -51,10 +51,10 @@ namespace ShardingCore.Sharding.Query
{
Check.NotNull(queryProvider, nameof(queryProvider));
Check.NotNull(expression, nameof(expression));
if (queryProvider is ShardingEntityQueryProvider shardingEntityQueryProvider)
{
_dbContext = shardingEntityQueryProvider.GetCurrentDbContext().Context;
}
//if (queryProvider is ShardingEntityQueryProvider shardingEntityQueryProvider)
//{
// _dbContext = shardingEntityQueryProvider.GetCurrentDbContext().Context;
//}
_queryProvider = queryProvider;
Expression = expression;
@ -63,20 +63,11 @@ namespace ShardingCore.Sharding.Query
public IEnumerator<TResult> GetEnumerator()
{
throw new NotImplementedException();
}
public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
if (_dbContext is IShardingDbContext shardingDbContext)
{
IQueryable<TResult> queryable = new EnumerableQuery<TResult>(Expression);
var streamMergeContext = ShardingContainer.GetService<IStreamMergeContextFactory>().Create(queryable, shardingDbContext);
var streamMergeEngine = GenericStreamMergeEngine<TResult>.Create<TResult>(streamMergeContext);
return streamMergeEngine.GetAsyncEnumerator().GetAwaiter().GetResult();
}
throw new ShardingCoreException("db context is not IShardingDbContext");
return _queryProvider.ExecuteAsync<IAsyncEnumerable<TResult>>(Expression).GetAsyncEnumerator(cancellationToken);
}
IEnumerator IEnumerable.GetEnumerator()
@ -86,13 +77,13 @@ namespace ShardingCore.Sharding.Query
public Type ElementType => typeof(TResult);
public Expression Expression { get; }
public IQueryProvider Provider => _queryProvider;
public IQueryProvider Provider => _queryProvider;
public IList GetList()
{
throw new NotSupportedException(CoreStrings.DataBindingWithIListSource);
}
public bool ContainsListCollection => false;
public bool ContainsListCollection => false;
}
}

View File

@ -29,7 +29,17 @@ namespace ShardingCore.Core.Internal.StreamMerge.ReWrite
var orders = extraEntry.Orders ?? Enumerable.Empty<PropertyOrder>();
//去除分页,获取前Take+Skip数量
var reWriteQueryable = _queryable.RemoveTake().RemoveSkip();
var reWriteQueryable = _queryable;
if (take.HasValue)
{
reWriteQueryable = _queryable.RemoveTake();
}
if (skip.HasValue)
{
reWriteQueryable = _queryable.RemoveSkip();
}
if (take.HasValue)
reWriteQueryable = reWriteQueryable.Take(take.Value + skip.GetValueOrDefault());
//包含group by

View File

@ -14,10 +14,10 @@ using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Query.Internal;
using ShardingCore.Core;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Internal;
using ShardingCore.Sharding.Query;
namespace ShardingCore.Sharding
{
@ -28,173 +28,15 @@ namespace ShardingCore.Sharding
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingInternalDbSet<TEntity> :
DbSet<TEntity>,
IQueryable<TEntity>,
IAsyncEnumerable<TEntity>,
IInfrastructure<IServiceProvider>,
IResettableService
public class ShardingInternalDbSet<TEntity> :InternalDbSet<TEntity>
where TEntity : class
{
private readonly DbContext _context;
private readonly string _entityTypeName;
private IEntityType _entityType;
private ShardingEntityQueryable<TEntity> _entityQueryable;
private LocalView<TEntity> _localView;
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public ShardingInternalDbSet(DbContext context, string entityTypeName)
public ShardingInternalDbSet(DbContext context, string entityTypeName) : base(context, entityTypeName)
{
Check.NotNull(context, nameof(context));
// Just storing context/service locator here so that the context will be initialized by the time the
// set is used and services will be obtained from the correctly scoped container when this happens.
_context = context;
_entityTypeName = entityTypeName;
}
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public override IEntityType EntityType
{
get
{
if (_entityType != null)
{
return _entityType;
}
_entityType = _entityTypeName != null
? _context.Model.FindEntityType(_entityTypeName)
: _context.Model.FindEntityType(typeof(TEntity));
if (_entityType == null)
{
if (_context.Model.HasEntityTypeWithDefiningNavigation(typeof(TEntity)))
{
throw new InvalidOperationException(
CoreStrings.InvalidSetTypeWeak(typeof(TEntity).ShortDisplayName()));
}
if (_context.Model.IsShared(typeof(TEntity)))
{
throw new InvalidOperationException(
CoreStrings.InvalidSetSharedType(typeof(TEntity).ShortDisplayName()));
}
throw new InvalidOperationException(CoreStrings.InvalidSetType(typeof(TEntity).ShortDisplayName()));
}
if (_entityType.IsOwned())
{
_entityType = null;
throw new InvalidOperationException(
CoreStrings.InvalidSetTypeOwned(typeof(TEntity).ShortDisplayName()));
}
if (_entityType.ClrType != typeof(TEntity))
{
var message = CoreStrings.DbSetIncorrectGenericType(
_entityType.ShortName(), _entityType.ClrType.ShortDisplayName(),
typeof(TEntity).ShortDisplayName());
_entityType = null;
throw new InvalidOperationException(message);
}
return _entityType;
}
}
private void CheckState()
{
// ReSharper disable once AssignmentIsFullyDiscarded
_ = EntityType;
}
private void CheckKey()
{
if (EntityType.FindPrimaryKey() == null)
{
throw new InvalidOperationException(
CoreStrings.InvalidSetKeylessOperation(typeof(TEntity).ShortDisplayName()));
}
}
private ShardingEntityQueryable<TEntity> ShardingEntityQueryable
{
get
{
CheckState();
return NonCapturingLazyInitializer.EnsureInitialized(
ref _entityQueryable,
this,
internalSet => internalSet.CreateEntityQueryable());
}
}
private ShardingEntityQueryable<TEntity> CreateEntityQueryable()
=> new ShardingEntityQueryable<TEntity>(_context.GetDependencies().QueryProvider, EntityType);
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public override LocalView<TEntity> Local
{
get
{
CheckKey();
if (_context.ChangeTracker.AutoDetectChangesEnabled)
{
_context.ChangeTracker.DetectChanges();
}
return _localView ??= new LocalView<TEntity>(this);
}
}
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public override TEntity Find(params object[] keyValues)
=> Finder.Find(keyValues);
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public override ValueTask<TEntity> FindAsync(params object[] keyValues)
=> Finder.FindAsync(keyValues);
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public override ValueTask<TEntity> FindAsync(object[] keyValues, CancellationToken cancellationToken)
=> Finder.FindAsync(keyValues, cancellationToken);
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
@ -514,94 +356,5 @@ namespace ShardingCore.Sharding
}
}
private IEntityFinder<TEntity> Finder
=> (IEntityFinder<TEntity>)_context.GetDependencies().EntityFinderFactory.Create(EntityType);
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
IEnumerator<TEntity> IEnumerable<TEntity>.GetEnumerator()
=> ShardingEntityQueryable.GetEnumerator();
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
IEnumerator IEnumerable.GetEnumerator()
=> ShardingEntityQueryable.GetEnumerator();
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
IAsyncEnumerator<TEntity> IAsyncEnumerable<TEntity>.GetAsyncEnumerator(CancellationToken cancellationToken)
=> ShardingEntityQueryable.GetAsyncEnumerator(cancellationToken);
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
Type IQueryable.ElementType
=> ShardingEntityQueryable.ElementType;
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
Expression IQueryable.Expression
=> ShardingEntityQueryable.Expression;
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
IQueryProvider IQueryable.Provider
=> ShardingEntityQueryable.Provider;
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
IServiceProvider IInfrastructure<IServiceProvider>.Instance
=> _context.GetInfrastructure();
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
void IResettableService.ResetState()
=> _localView = null;
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
/// <param name="cancellationToken"> A <see cref="CancellationToken" /> to observe while waiting for the task to complete. </param>
Task IResettableService.ResetStateAsync(CancellationToken cancellationToken)
{
((IResettableService)this).ResetState();
return Task.CompletedTask;
}
}
}

View File

@ -0,0 +1,44 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using Microsoft.EntityFrameworkCore;
namespace ShardingCore.Sharding
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/17 21:03:03
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingTempInternalDbSet<TEntity> :
IQueryable<TEntity>,
IEnumerable<TEntity>,
IEnumerable,
IQueryable where TEntity:class
{
private IQueryable<TEntity> _queryable;
public ShardingTempInternalDbSet(EnumerableQuery<TEntity> queryable)
{
_queryable = queryable.AsQueryable();
}
public IEnumerator<TEntity> GetEnumerator()
{
return _queryable.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public Type ElementType => _queryable.ElementType;
public Expression Expression => _queryable.Expression;
public IQueryProvider Provider => _queryable.Provider;
}
}

View File

@ -22,7 +22,7 @@ namespace ShardingCore.Sharding
public class StreamMergeContext<T>
{
private readonly IShardingParallelDbContextFactory _shardingParallelDbContextFactory;
private readonly IShardingScopeFactory _shardingScopeFactory;
//private readonly IShardingScopeFactory _shardingScopeFactory;
private readonly IQueryable<T> _source;
private readonly IShardingDbContext _shardingDbContext;
private readonly IRoutingRuleEngineFactory _tableRoutingRuleEngineFactory;
@ -41,7 +41,7 @@ namespace ShardingCore.Sharding
IShardingParallelDbContextFactory shardingParallelDbContextFactory,IShardingScopeFactory shardingScopeFactory)
{
_shardingParallelDbContextFactory = shardingParallelDbContextFactory;
_shardingScopeFactory = shardingScopeFactory;
//_shardingScopeFactory = shardingScopeFactory;
_source = source;
_shardingDbContext = shardingDbContext;
_tableRoutingRuleEngineFactory = tableRoutingRuleEngineFactory;
@ -78,10 +78,10 @@ namespace ShardingCore.Sharding
return _tableRoutingRuleEngineFactory.Route(_source);
}
public ShardingScope CreateScope()
{
return _shardingScopeFactory.CreateScope();
}
//public ShardingScope CreateScope()
//{
// return _shardingScopeFactory.CreateScope();
//}
public IQueryable<T> GetReWriteQueryable()
{

View File

@ -0,0 +1,75 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
namespace ShardingCore.Sharding.StreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/17 14:22:10
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractInMemoryAsyncStreamMergeEngine<T>
{
private readonly StreamMergeContext<T> _mergeContext;
public AbstractInMemoryAsyncStreamMergeEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
}
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable)
{
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
}
public async Task<List<T>> ExecuteAsync(Func<IQueryable, Task<T>> efQuery,CancellationToken cancellationToken = new CancellationToken())
{
var tableResult = _mergeContext.GetRouteResults();
var enumeratorTasks = tableResult.Select(routeResult =>
{
if (routeResult.ReplaceTables.Count > 1)
throw new ShardingCoreException("route found more than 1 table name s");
var tail = string.Empty;
if (routeResult.ReplaceTables.Count == 1)
tail = routeResult.ReplaceTables.First().Tail;
return Task.Run(async () =>
{
try
{
//using (var scope = _mergeContext.CreateScope())
//{
//var shardingContext = ShardingContext.Create(routeResult);
//scope.ShardingAccessor.ShardingContext = shardingContext;
var shardingDbContext = _mergeContext.CreateDbContext(tail);
var newQueryable = (IQueryable<T>)_mergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var query = await efQuery(newQueryable);
return query;
//}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}).ToArray();
return (await Task.WhenAll(enumeratorTasks)).ToList();
}
}
}

View File

@ -0,0 +1,135 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
namespace ShardingCore.Sharding.StreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: Saturday, 14 August 2021 22:07:28
* @Email: 326308290@qq.com
*/
public class AsyncEnumerableStreamMergeEngine<T> :IAsyncEnumerable<T>
{
private readonly StreamMergeContext<T> _mergeContext;
public AsyncEnumerableStreamMergeEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
}
//public static IStreamMergeEngine<T> Create<T>(StreamMergeContext<T> mergeContext)
//{
// return new AsyncEnumerableStreamMergeEngine<T>(mergeContext);
//}
//public async Task<IStreamMergeAsyncEnumerator<T>> GetAsyncEnumerator()
//{
// var tableResult = _mergeContext.GetRouteResults();
// var enumeratorTasks = tableResult.Select(routeResult =>
// {
// if (routeResult.ReplaceTables.Count > 1)
// throw new ShardingCoreException("route found more than 1 table name s");
// var tail = string.Empty;
// if (routeResult.ReplaceTables.Count == 1)
// tail = routeResult.ReplaceTables.First().Tail;
// return Task.Run(async () =>
// {
// try
// {
// //using (var scope = _mergeContext.CreateScope())
// //{
// //var shardingContext = ShardingContext.Create(routeResult);
// //scope.ShardingAccessor.ShardingContext = shardingContext;
// var shardingDbContext = _mergeContext.CreateDbContext(tail);
// var newQueryable = (IQueryable<T>) _mergeContext.GetReWriteQueryable()
// .ReplaceDbContextQueryable(shardingDbContext);
// var asyncEnumerator = await GetAsyncEnumerator(newQueryable);
// return new StreamMergeAsyncEnumerator<T>(asyncEnumerator);
// //}
// }
// catch (Exception e)
// {
// Console.WriteLine(e);
// throw;
// }
// });
// }).ToArray();
// var streamEnumerators = await Task.WhenAll(enumeratorTasks);
// if (_mergeContext.HasSkipTake())
// return new PaginationStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
// if (_mergeContext.HasGroupQuery())
// return new MultiAggregateOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
// return new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
//}
//public IStreamMergeEnumerator<T> GetEnumerator()
//{
// throw new NotImplementedException();
//}
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable)
{
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
}
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
var tableResult = _mergeContext.GetRouteResults();
var enumeratorTasks = tableResult.Select(routeResult =>
{
if (routeResult.ReplaceTables.Count > 1)
throw new ShardingCoreException("route found more than 1 table name s");
var tail = string.Empty;
if (routeResult.ReplaceTables.Count == 1)
tail = routeResult.ReplaceTables.First().Tail;
return Task.Run(async () =>
{
try
{
//using (var scope = _mergeContext.CreateScope())
//{
//var shardingContext = ShardingContext.Create(routeResult);
//scope.ShardingAccessor.ShardingContext = shardingContext;
var shardingDbContext = _mergeContext.CreateDbContext(tail);
var newQueryable = (IQueryable<T>)_mergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var asyncEnumerator = await GetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<T>(asyncEnumerator);
//}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
if (_mergeContext.HasSkipTake())
return new PaginationStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
if (_mergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
return new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
}
}
}

View File

@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Extensions;
namespace ShardingCore.Sharding.StreamMergeEngines
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/17 15:16:36
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class FirstOrDefaultAsyncInMemoryAsyncStreamMergeEngine<TResult>:AbstractInMemoryAsyncStreamMergeEngine<TResult>
{
private readonly StreamMergeContext<TResult> _mergeContext;
public FirstOrDefaultAsyncInMemoryAsyncStreamMergeEngine(StreamMergeContext<TResult> mergeContext) : base(mergeContext)
{
_mergeContext = mergeContext;
}
public async Task<TResult> ExecuteAsync(Func<IQueryable, Task<TResult>> efQuery, CancellationToken cancellationToken = new CancellationToken())
{
var result = await base.ExecuteAsync(efQuery, cancellationToken);
var q = result.Where(o => o != null).AsQueryable();
if (_mergeContext.Orders.Any())
return q.OrderWithExpression(_mergeContext.Orders).FirstOrDefault();
return q.FirstOrDefault();
}
}
}

View File

@ -61,11 +61,13 @@ namespace ShardingCore.Core.Internal.Visitors
{
var dbContextDependencies = typeof(DbContext).GetTypePropertyValue(_dbContext, "DbContextDependencies") as IDbContextDependencies;
var targetIQ = (IQueryable) ((IDbSetCache) _dbContext).GetOrAddSet(dbContextDependencies.SetSource, queryRootExpression.EntityType.ClrType);
var newQueryable = targetIQ.Provider.CreateQuery((Expression) Expression.Call((Expression) null, typeof(EntityFrameworkQueryableExtensions).GetTypeInfo().GetDeclaredMethod("AsNoTracking").MakeGenericMethod(queryRootExpression.EntityType.ClrType), targetIQ.Expression));
//AsNoTracking
//(Expression)Expression.Call((Expression)null, typeof(EntityFrameworkQueryableExtensions).GetTypeInfo().GetDeclaredMethod("AsNoTracking").MakeGenericMethod(queryRootExpression.EntityType.ClrType), targetIQ.Expression)
var newQueryable = targetIQ.Provider.CreateQuery(targetIQ.Expression);
Source = newQueryable;
//如何替换ef5的set
var replaceQueryRoot = new ReplaceSingleQueryRootExpressionVisitor();
replaceQueryRoot.Visit(newQueryable.Expression);
replaceQueryRoot.Visit(Source.Expression);
return base.VisitExtension(replaceQueryRoot.QueryRootExpression);
}

View File

@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Text;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/16 15:18:37
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ShardingConfig<T> where T:DbContext,IShardingTableDbContext
{
public Func<DbConnection, DbContextOptionsBuilder<T>, DbContextOptions<T>> ShardingDbContextOptionsCreator { get; private set; }
public void UseShardingDbContextOptions(Func<DbConnection, DbContextOptionsBuilder<T>, DbContextOptions<T>> shardingDbContextOptions)
{
ShardingDbContextOptionsCreator = shardingDbContextOptions ?? throw new ArgumentNullException(nameof(shardingDbContextOptions));
}
}
}

View File

@ -1,5 +1,5 @@
{
"SqlServer": {
"ConnectionString": "Data Source=localhost;Initial Catalog=ShardingCoreDB;Integrated Security=True;MultipleActiveResultSets=true"
"ConnectionString": "Data Source=localhost;Initial Catalog=ShardingCoreDB;Integrated Security=True"
}
}

View File

@ -22,7 +22,7 @@ namespace ShardingCore.Test50
private readonly ShardingDefaultDbContext _virtualDbContext;
private readonly IRoutingRuleEngineFactory _routingRuleEngineFactory;
public ShardingTest(ShardingDefaultDbContext virtualDbContext,IRoutingRuleEngineFactory routingRuleEngineFactory)
public ShardingTest(ShardingDefaultDbContext virtualDbContext, IRoutingRuleEngineFactory routingRuleEngineFactory)
{
_virtualDbContext = virtualDbContext;
_routingRuleEngineFactory = routingRuleEngineFactory;
@ -48,31 +48,24 @@ namespace ShardingCore.Test50
[Fact]
public async Task ToList_All_Test()
{
try
var mods = await _virtualDbContext.Set<SysUserMod>().ToListAsync();
Assert.Equal(1000, mods.Count);
var modOrders1 = await _virtualDbContext.Set<SysUserMod>().OrderBy(o => o.Age).ToListAsync();
int ascAge = 1;
foreach (var sysUserMod in modOrders1)
{
var mods = await _virtualDbContext.Set<SysUserMod>().ToListAsync();
Assert.Equal(1000, mods.Count);
var modOrders1 = await _virtualDbContext.Set<SysUserMod>().OrderBy(o => o.Age).ToListAsync();
int ascAge = 1;
foreach (var sysUserMod in modOrders1)
{
Assert.Equal(ascAge, sysUserMod.Age);
ascAge++;
}
var modOrders2 = await _virtualDbContext.Set<SysUserMod>().OrderByDescending(o => o.Age).ToListAsync();
int descAge = 1000;
foreach (var sysUserMod in modOrders2)
{
Assert.Equal(descAge, sysUserMod.Age);
descAge--;
}
Assert.Equal(ascAge, sysUserMod.Age);
ascAge++;
}
catch (Exception e)
var modOrders2 = await _virtualDbContext.Set<SysUserMod>().OrderByDescending(o => o.Age).ToListAsync();
int descAge = 1000;
foreach (var sysUserMod in modOrders2)
{
throw;
Assert.Equal(descAge, sysUserMod.Age);
descAge--;
}
}

View File

@ -49,8 +49,8 @@ namespace ShardingCore.Test50
services.AddShardingSqlServer(o =>
{
o.EnsureCreatedWithOutShardingTable = true;
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = false;
o.CreateShardingTableOnStart = false;
o.UseShardingDbContext<DefaultDbContext>( dbConfig =>
{
dbConfig.AddShardingTableRoute<SysUserModVirtualTableRoute>();
@ -82,7 +82,7 @@ namespace ShardingCore.Test50
var shardingBootstrapper = serviceProvider.GetService<IShardingBootstrapper>();
shardingBootstrapper.Start();
// 有一些测试数据要初始化可以放在这里
//InitData(serviceProvider).GetAwaiter().GetResult();
//InitData(serviceProvider).GetAwaiter().GetResult();
}
/// <summary>
@ -94,9 +94,7 @@ namespace ShardingCore.Test50
{
using (var scope = serviceProvider.CreateScope())
{
var virtualDbContext = scope.ServiceProvider.GetService<DefaultDbContext>();
if (!await virtualDbContext.Set<SysUserMod>().AnyAsync(o => true))
{
var virtualDbContext = scope.ServiceProvider.GetService<ShardingDefaultDbContext>();
var ids = Enumerable.Range(1, 1000);
var userMods = new List<SysUserMod>();
var userSalaries = new List<SysUserSalary>();
@ -136,7 +134,6 @@ namespace ShardingCore.Test50
await virtualDbContext.AddRangeAsync(userSalaries);
await virtualDbContext.SaveChangesAsync();
}
}
}
}