This commit is contained in:
xuejiaming 2021-08-20 17:36:15 +08:00
parent 4accc08567
commit 9309e587c8
10 changed files with 305 additions and 107 deletions

View File

@ -6,11 +6,11 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="5.0.9" /> <PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="3.1.18" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\src\ShardingCore\ShardingCore.csproj" /> <ProjectReference Include="..\..\src3x\ShardingCore.3x\ShardingCore.3x.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -21,10 +21,10 @@ namespace Sample.SqlServer
public void ConfigureServices(IServiceCollection services) public void ConfigureServices(IServiceCollection services)
{ {
services.AddControllers(); services.AddControllers();
services.AddDbContext<DefaultTableDbContext>(o => o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDB;Integrated Security=True")); //services.AddDbContext<DefaultTableDbContext>(o => o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDBxx3;Integrated Security=True"));
services.AddShardingDbContext<DefaultShardingDbContext, DefaultTableDbContext>(o => o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDBxx;Integrated Security=True;MultipleActiveResultSets=True;") services.AddShardingDbContext<DefaultShardingDbContext, DefaultTableDbContext>(o => o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDBxx3;Integrated Security=True;MultipleActiveResultSets=True;")
,op => ,op =>
{ {
op.EnsureCreatedWithOutShardingTable = true; op.EnsureCreatedWithOutShardingTable = true;

View File

@ -12,8 +12,11 @@ namespace ShardingCore.EFCores
* @Date: Saturday, 14 August 2021 10:17:43 * @Date: Saturday, 14 August 2021 10:17:43
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
#if !EFCORE2
public class ShardingDbSetSource:IDbSetSource public class ShardingDbSetSource:IDbSetSource
{ {
#if EFCORE5
private static readonly MethodInfo _genericCreateSet private static readonly MethodInfo _genericCreateSet
= typeof(ShardingDbSetSource).GetTypeInfo().GetDeclaredMethod(nameof(CreateSetFactory)); = typeof(ShardingDbSetSource).GetTypeInfo().GetDeclaredMethod(nameof(CreateSetFactory));
@ -41,12 +44,90 @@ namespace ShardingCore.EFCores
private object CreateCore(DbContext context, Type type, string name, MethodInfo createMethod) private object CreateCore(DbContext context, Type type, string name, MethodInfo createMethod)
=> _cache.GetOrAdd( => _cache.GetOrAdd(
(type, name), (type, name),
t => (Func<DbContext,string, object>)createMethod t => (Func<DbContext, string, object>)createMethod
.MakeGenericMethod(t.Type) .MakeGenericMethod(t.Type)
.Invoke(null, null))(context, name); .Invoke(null, null))(context, name);
private static Func<DbContext, string, object> CreateSetFactory<TEntity>() private static Func<DbContext, string, object> CreateSetFactory<TEntity>()
where TEntity : class where TEntity : class
=> (c, name) => new ShardingInternalDbSet<TEntity>(c, name); => (c, name) => new ShardingInternalDbSet<TEntity>(c, name);
#endif
#if EFCORE3
private static readonly MethodInfo _genericCreateSet
= typeof(ShardingDbSetSource).GetTypeInfo().GetDeclaredMethod(nameof(CreateSetFactory));
private readonly ConcurrentDictionary<Type, Func<DbContext, object>> _cache
= new ConcurrentDictionary<Type, Func<DbContext, object>>();
/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual object Create(DbContext context, Type type)
=> CreateCore(context, type, _genericCreateSet);
private object CreateCore(DbContext context, Type type, MethodInfo createMethod)
=> _cache.GetOrAdd(
type,
t => (Func<DbContext, object>)createMethod
.MakeGenericMethod(t)
.Invoke(null, null))(context);
private static Func<DbContext, object> CreateSetFactory<TEntity>()
where TEntity : class
=> c => new ShardingInternalDbSet<TEntity>(c);
#endif
} }
#endif
#if EFCORE2
/// <summary>
/// This API supports the Entity Framework Core infrastructure and is not intended to be used
/// directly from your code. This API may change or be removed in future releases.
/// </summary>
public class ShardingDbSetSource : IDbSetSource, IDbQuerySource
{
private static readonly MethodInfo _genericCreateSet
= typeof(ShardingDbSetSource).GetTypeInfo().GetDeclaredMethod(nameof(CreateSetFactory));
private static readonly MethodInfo _genericCreateQuery
= typeof(ShardingDbSetSource).GetTypeInfo().GetDeclaredMethod(nameof(CreateQueryFactory));
private readonly ConcurrentDictionary<Type, Func<DbContext, object>> _cache
= new ConcurrentDictionary<Type, Func<DbContext, object>>();
/// <summary>
/// This API supports the Entity Framework Core infrastructure and is not intended to be used
/// directly from your code. This API may change or be removed in future releases.
/// </summary>
public virtual object Create(DbContext context, Type type)
=> CreateCore(context, type, _genericCreateSet);
/// <summary>
/// This API supports the Entity Framework Core infrastructure and is not intended to be used
/// directly from your code. This API may change or be removed in future releases.
/// </summary>
public virtual object CreateQuery(DbContext context, Type type)
=> CreateCore(context, type, _genericCreateQuery);
private object CreateCore(DbContext context, Type type, MethodInfo createMethod)
=> _cache.GetOrAdd(
type,
t => (Func<DbContext, object>)createMethod
.MakeGenericMethod(t)
.Invoke(null, null))(context);
private static Func<DbContext, object> CreateSetFactory<TEntity>()
where TEntity : class
=> c => new ShardingInternalDbSet<TEntity>(c);
private static Func<DbContext, DbQuery<TQuery>> CreateQueryFactory<TQuery>()
where TQuery : class
=> c => new ShardingInternalDbQuery<TQuery>(c);
}
#endif
} }

View File

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Internal;
namespace ShardingCore.EFCores
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/8/20 17:05:36
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
#if EFCORE2
public class ShardingInternalDbQuery<TQuery> : InternalDbQuery<TQuery> where TQuery : class
{
public ShardingInternalDbQuery(DbContext context) : base(context)
{
}
}
#endif
}

View File

@ -1,4 +1,5 @@
using Microsoft.EntityFrameworkCore; 
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.ChangeTracking; using Microsoft.EntityFrameworkCore.ChangeTracking;
using Microsoft.EntityFrameworkCore.Internal; using Microsoft.EntityFrameworkCore.Internal;
using ShardingCore.Core; using ShardingCore.Core;
@ -17,15 +18,25 @@ namespace ShardingCore.EFCores
* @Ver: 1.0 * @Ver: 1.0
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class ShardingInternalDbSet<TEntity> :InternalDbSet<TEntity>
public class ShardingInternalDbSet<TEntity> : InternalDbSet<TEntity>
where TEntity : class where TEntity : class
{ {
private readonly DbContext _context; private readonly DbContext _context;
#if EFCORE5
public ShardingInternalDbSet(DbContext context, string entityTypeName) : base(context, entityTypeName) public ShardingInternalDbSet(DbContext context, string entityTypeName) : base(context, entityTypeName)
{ {
_context = context; _context = context;
} }
#endif
#if !EFCORE5
public ShardingInternalDbSet(DbContext context) : base(context)
{
_context = context;
}
#endif
/// <summary> /// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in /// the same compatibility standards as public APIs. It may be changed or removed without notice in
@ -34,7 +45,7 @@ namespace ShardingCore.EFCores
/// </summary> /// </summary>
public override EntityEntry<TEntity> Add(TEntity entity) public override EntityEntry<TEntity> Add(TEntity entity)
{ {
var genericDbContext = ((IShardingDbContext) _context).CreateGenericDbContext(entity); var genericDbContext = ((IShardingDbContext)_context).CreateGenericDbContext(entity);
return genericDbContext.Add(entity); return genericDbContext.Add(entity);
} }
@ -44,14 +55,24 @@ namespace ShardingCore.EFCores
/// any release. You should only use it directly in your code with extreme caution and knowing that /// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release. /// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary> /// </summary>
#if !EFCORE2
public override async ValueTask<EntityEntry<TEntity>> AddAsync( public override async ValueTask<EntityEntry<TEntity>> AddAsync(
TEntity entity, TEntity entity,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
var genericDbContext = ((IShardingDbContext) _context).CreateGenericDbContext(entity); var genericDbContext = ((IShardingDbContext)_context).CreateGenericDbContext(entity);
return await genericDbContext.AddAsync(entity, cancellationToken); return await genericDbContext.AddAsync(entity, cancellationToken);
} }
#endif
#if EFCORE2
public override async Task<EntityEntry<TEntity>> AddAsync(TEntity entity, CancellationToken cancellationToken = new CancellationToken())
{
var genericDbContext = ((IShardingDbContext)_context).CreateGenericDbContext(entity);
return await genericDbContext.AddAsync(entity, cancellationToken);
}
#endif
/// <summary> /// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
@ -61,7 +82,7 @@ namespace ShardingCore.EFCores
/// </summary> /// </summary>
public override EntityEntry<TEntity> Attach(TEntity entity) public override EntityEntry<TEntity> Attach(TEntity entity)
{ {
var genericDbContext = ((IShardingDbContext) _context).CreateGenericDbContext(entity); var genericDbContext = ((IShardingDbContext)_context).CreateGenericDbContext(entity);
return genericDbContext.Attach(entity); return genericDbContext.Attach(entity);
} }
@ -75,7 +96,7 @@ namespace ShardingCore.EFCores
{ {
Check.NotNull(entity, nameof(entity)); Check.NotNull(entity, nameof(entity));
var genericDbContext = ((IShardingDbContext) _context).CreateGenericDbContext(entity); var genericDbContext = ((IShardingDbContext)_context).CreateGenericDbContext(entity);
return genericDbContext.Remove(entity); return genericDbContext.Remove(entity);
} }
@ -87,7 +108,7 @@ namespace ShardingCore.EFCores
/// </summary> /// </summary>
public override EntityEntry<TEntity> Update(TEntity entity) public override EntityEntry<TEntity> Update(TEntity entity)
{ {
var genericDbContext = ((IShardingDbContext) _context).CreateGenericDbContext(entity); var genericDbContext = ((IShardingDbContext)_context).CreateGenericDbContext(entity);
return genericDbContext.Update(entity); return genericDbContext.Update(entity);
} }
@ -264,7 +285,7 @@ namespace ShardingCore.EFCores
foreach (var group in groups) foreach (var group in groups)
{ {
await group.Key.AddRangeAsync(group.Select(o => o.Entity)); await group.Key.AddRangeAsync(group.Select(o => o.Entity));
} }
} }
@ -276,7 +297,7 @@ namespace ShardingCore.EFCores
/// </summary> /// </summary>
public override void AttachRange(IEnumerable<TEntity> entities) public override void AttachRange(IEnumerable<TEntity> entities)
{ {
var groups = entities.Select(o => var groups = entities.Select(o =>
{ {
var dbContext = ((IShardingDbContext)_context).CreateGenericDbContext(o); var dbContext = ((IShardingDbContext)_context).CreateGenericDbContext(o);
@ -289,7 +310,7 @@ namespace ShardingCore.EFCores
foreach (var group in groups) foreach (var group in groups)
{ {
group.Key.AttachRange(group.Select(o => o.Entity)); group.Key.AttachRange(group.Select(o => o.Entity));
} }
} }
@ -346,4 +367,5 @@ namespace ShardingCore.EFCores
} }
} }
}
}

View File

@ -91,7 +91,7 @@ namespace ShardingCore.Sharding
{ {
if (!_dbContextCaches.TryGetValue(tail, out var dbContext)) if (!_dbContextCaches.TryGetValue(tail, out var dbContext))
{ {
dbContext = _shardingDbContextFactory.Create(ShardingDbContextType,CreateSameShardingDbContextOptions(tail == EMPTY_SHARDING_TAIL_ID ? string.Empty : tail)); dbContext = _shardingDbContextFactory.Create(ShardingDbContextType, CreateSameShardingDbContextOptions(tail == EMPTY_SHARDING_TAIL_ID ? string.Empty : tail));
_dbContextCaches.TryAdd(tail, dbContext); _dbContextCaches.TryAdd(tail, dbContext);
} }
@ -105,7 +105,7 @@ namespace ShardingCore.Sharding
var tail = EMPTY_SHARDING_TAIL_ID; var tail = EMPTY_SHARDING_TAIL_ID;
if (entity.IsShardingTable()) if (entity.IsShardingTable())
{ {
var physicTable = _virtualTableManager.GetVirtualTable(ShardingDbContextType,entity.GetType()).RouteTo(new TableRouteConfig(null, entity as IShardingTable, null))[0]; var physicTable = _virtualTableManager.GetVirtualTable(ShardingDbContextType, entity.GetType()).RouteTo(new TableRouteConfig(null, entity as IShardingTable, null))[0];
tail = physicTable.Tail; tail = physicTable.Tail;
} }
@ -134,6 +134,8 @@ namespace ShardingCore.Sharding
return CreateGenericDbContext(entity).Add(entity); return CreateGenericDbContext(entity).Add(entity);
} }
#if !EFCORE2
public override ValueTask<EntityEntry<TEntity>> AddAsync<TEntity>(TEntity entity, CancellationToken cancellationToken = new CancellationToken()) public override ValueTask<EntityEntry<TEntity>> AddAsync<TEntity>(TEntity entity, CancellationToken cancellationToken = new CancellationToken())
{ {
return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken); return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken);
@ -143,6 +145,19 @@ namespace ShardingCore.Sharding
{ {
return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken); return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken);
} }
#endif
#if EFCORE2
public override Task<EntityEntry<TEntity>> AddAsync<TEntity>(TEntity entity, CancellationToken cancellationToken = new CancellationToken())
{
return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken);
}
public override Task<EntityEntry> AddAsync(object entity, CancellationToken cancellationToken = new CancellationToken())
{
return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken);
}
#endif
public override void AddRange(params object[] entities) public override void AddRange(params object[] entities)
{ {
@ -443,6 +458,8 @@ namespace ShardingCore.Sharding
} }
return i; return i;
} }
#if !EFCORE2
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken()) public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken())
{ {
@ -481,6 +498,49 @@ namespace ShardingCore.Sharding
} }
return i; return i;
} }
#endif
#if EFCORE2
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken())
{
var isBeginTransaction = IsBeginTransaction;
//如果是内部开的事务就内部自己消化
if (!isBeginTransaction)
{
await Database.BeginTransactionAsync(cancellationToken);
}
int i = 0;
try
{
foreach (var dbContextCache in _dbContextCaches)
{
dbContextCache.Value.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction());
i += await dbContextCache.Value.SaveChangesAsync(cancellationToken);
}
if (!isBeginTransaction)
Database.CurrentTransaction.Commit();
}
finally
{
if (!isBeginTransaction) { }
if (Database.CurrentTransaction != null)
{
Database.CurrentTransaction.Dispose();
foreach (var dbContextCache in _dbContextCaches)
{
dbContextCache.Value.Database.UseTransaction(null);
}
}
}
return i;
}
#endif
#if !EFCORE2
public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken()) public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
{ {
@ -520,6 +580,48 @@ namespace ShardingCore.Sharding
} }
return i; return i;
} }
#endif
#if EFCORE2
public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
{
var isBeginTransaction = IsBeginTransaction;
//如果是内部开的事务就内部自己消化
if (!isBeginTransaction)
{
await Database.BeginTransactionAsync(cancellationToken);
}
int i = 0;
try
{
foreach (var dbContextCache in _dbContextCaches)
{
dbContextCache.Value.Database.UseTransaction(Database.CurrentTransaction.GetDbTransaction());
i += await dbContextCache.Value.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
}
if (!isBeginTransaction)
Database.CurrentTransaction.Commit(cancellationToken);
}
finally
{
if (!isBeginTransaction)
if (Database.CurrentTransaction != null)
{
Database.CurrentTransaction.Dispose();
foreach (var dbContextCache in _dbContextCaches)
{
dbContextCache.Value.Database.UseTransaction(null);
}
}
}
return i;
}
#endif
public override void Dispose() public override void Dispose()
{ {
@ -537,6 +639,7 @@ namespace ShardingCore.Sharding
base.Dispose(); base.Dispose();
} }
#if !EFCORE2
public override async ValueTask DisposeAsync() public override async ValueTask DisposeAsync()
{ {
foreach (var dbContextCache in _dbContextCaches) foreach (var dbContextCache in _dbContextCaches)
@ -553,6 +656,7 @@ namespace ShardingCore.Sharding
await base.DisposeAsync(); await base.DisposeAsync();
} }
#endif
} }
} }

View File

@ -11,6 +11,10 @@ using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync; using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.Enumerators.StreamMergeSync; using ShardingCore.Sharding.Enumerators.StreamMergeSync;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
namespace ShardingCore.Sharding.StreamMergeEngines namespace ShardingCore.Sharding.StreamMergeEngines
{ {
/* /*
@ -19,7 +23,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Date: Saturday, 14 August 2021 22:07:28 * @Date: Saturday, 14 August 2021 22:07:28
* @Email: 326308290@qq.com * @Email: 326308290@qq.com
*/ */
public class AsyncEnumerableStreamMergeEngine<T> :IAsyncEnumerable<T>,IEnumerable<T> public class AsyncEnumerableStreamMergeEngine<T> : IAsyncEnumerable<T>, IEnumerable<T>
{ {
private readonly StreamMergeContext<T> _mergeContext; private readonly StreamMergeContext<T> _mergeContext;
@ -29,59 +33,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines
_mergeContext = mergeContext; _mergeContext = mergeContext;
} }
//public static IStreamMergeEngine<T> Create<T>(StreamMergeContext<T> mergeContext)
//{
// return new AsyncEnumerableStreamMergeEngine<T>(mergeContext);
//}
//public async Task<IStreamMergeAsyncEnumerator<T>> GetAsyncEnumerator()
//{
// var tableResult = _mergeContext.GetRouteResults();
// var enumeratorTasks = tableResult.Select(routeResult =>
// {
// if (routeResult.ReplaceTables.Count > 1)
// throw new ShardingCoreException("route found more than 1 table name s");
// var tail = string.Empty;
// if (routeResult.ReplaceTables.Count == 1)
// tail = routeResult.ReplaceTables.First().Tail;
// return Task.Run(async () =>
// {
// try
// {
// //using (var scope = _mergeContext.CreateScope())
// //{
// //var shardingContext = ShardingContext.Create(routeResult);
// //scope.ShardingAccessor.ShardingContext = shardingContext;
// var shardingDbContext = _mergeContext.CreateDbContext(tail);
// var newQueryable = (IQueryable<T>) _mergeContext.GetReWriteQueryable()
// .ReplaceDbContextQueryable(shardingDbContext);
// var asyncEnumerator = await GetAsyncEnumerator(newQueryable);
// return new StreamMergeAsyncEnumerator<T>(asyncEnumerator);
// //}
// }
// catch (Exception e)
// {
// Console.WriteLine(e);
// throw;
// }
// });
// }).ToArray();
// var streamEnumerators = await Task.WhenAll(enumeratorTasks);
// if (_mergeContext.HasSkipTake())
// return new PaginationStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
// if (_mergeContext.HasGroupQuery())
// return new MultiAggregateOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
// return new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
//}
//public IStreamMergeEnumerator<T> GetEnumerator()
//{
// throw new NotImplementedException();
//}
#if !EFCORE2
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable) private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable)
{ {
@ -89,8 +42,33 @@ namespace ShardingCore.Sharding.StreamMergeEngines
await enumator.MoveNextAsync(); await enumator.MoveNextAsync();
return enumator; return enumator;
} }
#endif
#if EFCORE2
private async Task<IAsyncEnumerator<T>> GetAsyncEnumerator(IQueryable<T> newQueryable)
{
var enumator = newQueryable.AsAsyncEnumerable().GetEnumerator();
await enumator.MoveNext();
return enumator;
}
#endif
#if !EFCORE2
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
return GetShardingEnumerator();
}
#endif
#if EFCORE2
IAsyncEnumerator<T> IAsyncEnumerable<T>.GetEnumerator()
{
return GetShardingEnumerator();
}
#endif
private IAsyncEnumerator<T> GetShardingEnumerator()
{ {
var tableResult = _mergeContext.GetRouteResults(); var tableResult = _mergeContext.GetRouteResults();
var enumeratorTasks = tableResult.Select(routeResult => var enumeratorTasks = tableResult.Select(routeResult =>
@ -105,18 +83,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{ {
try try
{ {
//using (var scope = _mergeContext.CreateScope())
//{
//var shardingContext = ShardingContext.Create(routeResult);
//scope.ShardingAccessor.ShardingContext = shardingContext;
var shardingDbContext = _mergeContext.CreateDbContext(tail); var shardingDbContext = _mergeContext.CreateDbContext(tail);
var newQueryable = (IQueryable<T>)_mergeContext.GetReWriteQueryable() var newQueryable = (IQueryable<T>)_mergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext); .ReplaceDbContextQueryable(shardingDbContext);
var asyncEnumerator = await GetAsyncEnumerator(newQueryable); var asyncEnumerator = await GetAsyncEnumerator(newQueryable);
return new StreamMergeAsyncEnumerator<T>(asyncEnumerator); return new StreamMergeAsyncEnumerator<T>(asyncEnumerator);
//}
} }
catch (Exception e) catch (Exception e)
{ {
@ -126,7 +98,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
}); });
}).ToArray(); }).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult(); var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
if (_mergeContext.HasSkipTake()) if (_mergeContext.HasSkipTake())
return new PaginationStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators); return new PaginationStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
if (_mergeContext.HasGroupQuery()) if (_mergeContext.HasGroupQuery())
@ -134,10 +106,11 @@ namespace ShardingCore.Sharding.StreamMergeEngines
return new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators); return new MultiOrderStreamMergeAsyncEnumerator<T>(_mergeContext, streamEnumerators);
} }
private IEnumerator<T> GetEnumerator(IQueryable<T> newQueryable)
private IEnumerator<T> GetEnumerator(IQueryable<T> newQueryable)
{ {
var enumator = newQueryable.AsEnumerable().GetEnumerator(); var enumator = newQueryable.AsEnumerable().GetEnumerator();
enumator.MoveNext(); enumator.MoveNext();
return enumator; return enumator;
} }
public IEnumerator<T> GetEnumerator() public IEnumerator<T> GetEnumerator()
@ -151,29 +124,24 @@ namespace ShardingCore.Sharding.StreamMergeEngines
if (routeResult.ReplaceTables.Count == 1) if (routeResult.ReplaceTables.Count == 1)
tail = routeResult.ReplaceTables.First().Tail; tail = routeResult.ReplaceTables.First().Tail;
return Task.Run( () => return Task.Run(() =>
{ {
try try
{ {
//using (var scope = _mergeContext.CreateScope())
//{
//var shardingContext = ShardingContext.Create(routeResult);
//scope.ShardingAccessor.ShardingContext = shardingContext;
var shardingDbContext = _mergeContext.CreateDbContext(tail); var shardingDbContext = _mergeContext.CreateDbContext(tail);
var newQueryable = (IQueryable<T>)_mergeContext.GetReWriteQueryable() var newQueryable = (IQueryable<T>)_mergeContext.GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext); .ReplaceDbContextQueryable(shardingDbContext);
var enumerator = GetEnumerator(newQueryable); var enumerator = GetEnumerator(newQueryable);
return new StreamMergeEnumerator<T>(enumerator); return new StreamMergeEnumerator<T>(enumerator);
//} }
} catch (Exception e)
catch (Exception e) {
{ Console.WriteLine(e);
Console.WriteLine(e); throw;
throw; }
} });
});
}).ToArray(); }).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult(); var streamEnumerators = Task.WhenAll(enumeratorTasks).GetAwaiter().GetResult();
@ -188,5 +156,6 @@ namespace ShardingCore.Sharding.StreamMergeEngines
{ {
return GetEnumerator(); return GetEnumerator();
} }
} }
} }

View File

@ -16,6 +16,5 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="5.0.7" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="5.0.7" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="5.0.7" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="5.0.7" />
<PackageReference Include="Microsoft.Extensions.DependencyModel" Version="5.0.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -25,7 +25,6 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.2.6" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.2.6" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.2.6" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.2.6" />
<PackageReference Include="Microsoft.Extensions.DependencyModel" Version="2.1.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -28,9 +28,8 @@
<Compile Remove="..\..\src\ShardingCore\ShardingTableConfig.cs" /> <Compile Remove="..\..\src\ShardingCore\ShardingTableConfig.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.10" /> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.18" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.1.10" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.1.18" />
<PackageReference Include="Microsoft.Extensions.DependencyModel" Version="3.1.6" />
</ItemGroup> </ItemGroup>
</Project> </Project>