优化命名优化代码结构整理框架结构

This commit is contained in:
xuejiaming 2021-10-03 14:09:01 +08:00
parent 04a069677b
commit 74c6f7f474
50 changed files with 300 additions and 354 deletions

View File

@ -10,6 +10,7 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using ShardingCore.Extensions.ShardingPageExtensions;
namespace Sample.BulkConsole
{

View File

@ -13,6 +13,7 @@ using Sample.SqlServer.Domain.Entities;
using ShardingCore.Core.QueryRouteManagers.Abstractions;
using ShardingCore.DbContexts.VirtualDbContexts;
using ShardingCore.Extensions;
using ShardingCore.Extensions.ShardingPageExtensions;
namespace Sample.SqlServer.Controllers
{

View File

@ -36,6 +36,7 @@ namespace ShardingCore.EFCores
}
#if !EFCORE2
public TResult ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
@ -66,11 +67,13 @@ namespace ShardingCore.EFCores
public IAsyncEnumerable<TResult> ExecuteAsync<TResult>(Expression query)
{
return _shardingQueryExecutor.ExecuteAsync<IAsyncEnumerable<TResult>>(_currentContext, query);
}
public Task<TResult> ExecuteAsync<TResult>(Expression query, CancellationToken cancellationToken)
{
return _shardingQueryExecutor.ExecuteAsync<Task<TResult>>(_currentContext, query, cancellationToken);
}
public Func<QueryContext, TResult> CreateCompiledQuery<TResult>(Expression query)

View File

@ -1,12 +1,10 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.ShardingPage.Abstractions;
namespace ShardingCore.Extensions
namespace ShardingCore.Extensions.ShardingPageExtensions
{
/*
* @Author: xjm
@ -15,7 +13,7 @@ namespace ShardingCore.Extensions
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public static class ShardingQueryableExtension
public static class ShardingPageExtension
{
public static async Task<ShardingPagedResult<T>> ToShardingPageAsync<T>(this IQueryable<T> source, int pageIndex, int pageSize)
{
@ -49,7 +47,7 @@ namespace ShardingCore.Extensions
using (shardingPageManager.CreateScope())
{
//获取每次总记录数
var count = source.Count();
var count = source.LongCount();
if (count <= skip)
return new ShardingPagedResult<T>(new List<T>(0), count);
var data = source.Skip(skip).Take(take).ToList();

View File

@ -1,6 +1,6 @@
using System.Collections.Generic;
namespace ShardingCore
namespace ShardingCore.Extensions.ShardingPageExtensions
{
/// <summary>
/// 分页集合

View File

@ -1,31 +1,16 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.ChangeTracking;
using Microsoft.EntityFrameworkCore.Storage;
using ShardingCore.Core;
using ShardingCore.Core.VirtualRoutes.TableRoutes;
using ShardingCore.Core.VirtualTables;
using ShardingCore.DbContexts;
using ShardingCore.DbContexts.ShardingDbContexts;
using ShardingCore.Exceptions;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ReadWriteConfigurations;
using ShardingCore.Sharding.ReadWriteConfigurations.Abstractions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources.PhysicDataSources;
using ShardingCore.Core.VirtualDatabase.VirtualTables;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
using ShardingCore.Sharding.ShardingDbContextExecutors;
using ShardingCore.Sharding.ShardingTransactions;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding
{

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace ShardingCore.Sharding.MergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/10/2 17:25:33
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractBaseMergeEngine<TEntity>
{
}
}

View File

@ -1,12 +1,11 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines
{
/*
* @Author: xjm
@ -17,12 +16,10 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
*/
public abstract class AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
{
private readonly MethodCallExpression _methodCallExpression;
private readonly TEntity _constantItem;
protected AbstractEnsureMethodCallConstantInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
_methodCallExpression = methodCallExpression;
var secondExpression = GetSecondExpression();
if (!(secondExpression is ConstantExpression constantExpression))
{
@ -35,7 +32,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
{
if (!(secondExpression is ConstantExpression))
{
throw new InvalidOperationException(_methodCallExpression.ShardingPrint());
throw new InvalidOperationException(GetMethodCallExpression().ShardingPrint());
}
return queryable;

View File

@ -1,13 +1,9 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines
{
/*
* @Author: xjm

View File

@ -1,12 +1,11 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines
{
/*
* @Author: xjm

View File

@ -1,11 +1,10 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines
{
/*
* @Author: xjm
@ -16,11 +15,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
*/
public abstract class AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine<TEntity, TResult> : AbstractEnsureMethodCallInMemoryAsyncMergeEngine<TEntity, TResult>
{
private readonly MethodCallExpression _methodCallExpression;
public AbstractEnsureMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
_methodCallExpression = methodCallExpression;
}
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression)
@ -30,7 +27,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureEx
return queryable.Where(predicate);
}
throw new InvalidOperationException(_methodCallExpression.ShardingPrint());
throw new InvalidOperationException(GetMethodCallExpression().ShardingPrint());
}
}

View File

@ -1,13 +1,9 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines
{
/*
* @Author: xjm

View File

@ -1,12 +1,11 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines
{
/*
* @Author: xjm
@ -36,9 +35,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE
return queryable;
}
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> _queryable, Expression secondExpression)
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression)
{
return _queryable;
return queryable;
}
}

View File

@ -1,11 +1,10 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines
{
/*
* @Author: xjm
@ -16,11 +15,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE
*/
public abstract class AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine<TEntity>: AbstractGenericMethodCallInMemoryAsyncMergeEngine<TEntity>
{
private readonly MethodCallExpression _methodCallExpression;
public AbstractGenericMethodCallWhereInMemoryAsyncMergeEngine(MethodCallExpression methodCallExpression, IShardingDbContext shardingDbContext) : base(methodCallExpression, shardingDbContext)
{
_methodCallExpression = methodCallExpression;
}
protected override IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression)
@ -32,7 +29,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericE
return queryable.Where(predicate);
}
}
throw new InvalidOperationException(_methodCallExpression.ShardingPrint());
throw new InvalidOperationException(GetMethodCallExpression().ShardingPrint());
}
}

View File

@ -1,16 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines
{
/*
* @Author: xjm

View File

@ -4,13 +4,12 @@ using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
{
/*
* @Author: xjm
@ -46,12 +45,13 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
_secondExpression = expression;
}
}
if(_queryable==null)
if (_queryable == null)
throw new ArgumentException($"argument not found IQueryable :[{methodCallExpression.ShardingPrint()}]");
if (methodCallExpression.Arguments.Count ==2)
if (methodCallExpression.Arguments.Count == 2)
{
if(_secondExpression == null)
if (_secondExpression == null)
throw new InvalidOperationException(methodCallExpression.ShardingPrint());
// ReSharper disable once VirtualMemberCallInConstructor
_queryable = CombineQueryable(_queryable, _secondExpression);
}
@ -66,12 +66,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
/// <returns></returns>
protected abstract IQueryable<TEntity> CombineQueryable(IQueryable<TEntity> queryable, Expression secondExpression);
private IQueryable CreateAsyncExecuteQueryable<TResult>(string dsname,TableRouteResult tableRouteResult)
private IQueryable CreateAsyncExecuteQueryable<TResult>(string dsname, TableRouteResult tableRouteResult)
{
var shardingDbContext = _mergeContext.CreateDbContext(dsname,tableRouteResult);
var newQueryable = (IQueryable<TEntity>) GetStreamMergeContext().GetReWriteQueryable()
var shardingDbContext = _mergeContext.CreateDbContext(dsname, tableRouteResult);
var newQueryable = (IQueryable<TEntity>)GetStreamMergeContext().GetReWriteQueryable()
.ReplaceDbContextQueryable(shardingDbContext);
var newCombineQueryable= DoCombineQueryable<TResult>(newQueryable);
var newCombineQueryable = DoCombineQueryable<TResult>(newQueryable);
return newCombineQueryable
;
}
@ -84,21 +84,14 @@ namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
{
return _mergeContext.TableRouteResults.Select(routeResult =>
{
var asyncExecuteQueryable = CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult);
return Task.Run(async () =>
{
try
{
var asyncExecuteQueryable = CreateAsyncExecuteQueryable<TResult>(dataSourceName, routeResult);
var queryResult = await efQuery(asyncExecuteQueryable);
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
//}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
var queryResult = await efQuery(asyncExecuteQueryable);
return new RouteQueryResult<TResult>(dataSourceName, routeResult, queryResult);
}, cancellationToken);
});
}).ToArray();

View File

@ -1,10 +1,7 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
{
/*
* @Author: xjm

View File

@ -1,10 +1,7 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
{
/*
* @Author: xjm

View File

@ -1,11 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Sharding.StreamMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines.Abstractions
namespace ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge
{
/*
* @Author: xjm

View File

@ -0,0 +1,149 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 15:35:39
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractEnumeratorStreamMergeEngine<TEntity> : IEnumeratorStreamMergeEngine<TEntity>
{
public StreamMergeContext<TEntity> StreamMergeContext { get; }
public AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
{
StreamMergeContext = streamMergeContext;
}
#if !EFCORE2
public IAsyncEnumerator<TEntity> GetAsyncEnumerator(
CancellationToken cancellationToken = new CancellationToken())
{
return GetStreamMergeAsyncEnumerator(true, cancellationToken);
}
#endif
#if EFCORE2
IAsyncEnumerator<TEntity> IAsyncEnumerable<TEntity>.GetEnumerator()
{
return GetStreamMergeAsyncEnumerator(true);
}
#endif
public IEnumerator<TEntity> GetEnumerator()
{
return GetStreamMergeAsyncEnumerator(false);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public void Dispose()
{
StreamMergeContext.Dispose();
}
/// <summary>
/// 获取查询的迭代器
/// </summary>
/// <param name="async"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
private IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(bool async,
CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var dbStreamMergeAsyncEnumerators = GetRouteQueryStreamMergeAsyncEnumerators(async);
if (dbStreamMergeAsyncEnumerators.IsEmpty())
throw new ShardingCoreException("GetRouteQueryStreamMergeAsyncEnumerators empty");
return CombineStreamMergeAsyncEnumerator(dbStreamMergeAsyncEnumerators);
}
/// <summary>
/// 获取路由查询的迭代器
/// </summary>
/// <param name="async"></param>
/// <returns></returns>
public abstract IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken());
/// <summary>
/// 合并成一个迭代器
/// </summary>
/// <param name="streamsAsyncEnumerators"></param>
/// <returns></returns>
public abstract IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators);
/// <summary>
/// 开启异步线程获取并发迭代器
/// </summary>
/// <param name="queryable"></param>
/// <param name="async"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<StreamMergeAsyncEnumerator<TEntity>> AsyncParallelQueryEnumerator(IQueryable<TEntity> queryable, bool async,CancellationToken cancellationToken=new CancellationToken())
{
return Task.Run(async () =>
{
if (async)
{
var asyncEnumerator = await GetAsyncEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
else
{
var enumerator = GetEnumerator0(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
}
}, cancellationToken);
}
/// <summary>
/// 获取异步迭代器
/// </summary>
/// <param name="newQueryable"></param>
/// <returns></returns>
public async Task<IAsyncEnumerator<TEntity>> GetAsyncEnumerator0(IQueryable<TEntity> newQueryable)
{
#if !EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
#endif
#if EFCORE2
var enumator = new EFCore2TryCurrentAsyncEnumerator<TEntity>(newQueryable.AsAsyncEnumerable().GetEnumerator());
await enumator.MoveNext();
return enumator;
#endif
}
/// <summary>
/// 获取同步迭代器
/// </summary>
/// <param name="newQueryable"></param>
/// <returns></returns>
public IEnumerator<TEntity> GetEnumerator0(IQueryable<TEntity> newQueryable)
{
var enumator = newQueryable.AsEnumerable().GetEnumerator();
enumator.MoveNext();
return enumator;
}
}
}

View File

@ -1,7 +1,7 @@
using System;
using System.Collections.Generic;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions
namespace ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge
{
/*
* @Author: xjm

View File

@ -11,8 +11,7 @@ using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{

View File

@ -8,8 +8,7 @@ using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{

View File

@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{

View File

@ -11,8 +11,7 @@ using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines
{

View File

@ -10,8 +10,7 @@ using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -10,8 +10,7 @@ using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -6,8 +6,7 @@ using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -6,7 +6,7 @@ using System.Threading;
using System.Threading.Tasks;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -1,13 +1,12 @@
using ShardingCore.Sharding.ShardingQueryExecutors;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.TrackerManagers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.ShardingQueryExecutors;
using ShardingCore.Sharding.StreamMergeEngines.TrackerEnumerators;
namespace ShardingCore.Sharding.StreamMergeEngines
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
{
/*
* @Author: xjm
@ -15,12 +14,12 @@ namespace ShardingCore.Sharding.StreamMergeEngines
* @Date: Saturday, 14 August 2021 22:07:28
* @Email: 326308290@qq.com
*/
public class AsyncEnumerableStreamMergeEngine<TShardingDbContext,T> : IAsyncEnumerable<T>, IEnumerable<T>
public class AsyncEnumeratorStreamMergeEngine<TShardingDbContext,T> : IAsyncEnumerable<T>, IEnumerable<T>
where TShardingDbContext:DbContext,IShardingDbContext
{
private readonly StreamMergeContext<T> _mergeContext;
public AsyncEnumerableStreamMergeEngine(StreamMergeContext<T> mergeContext)
public AsyncEnumeratorStreamMergeEngine(StreamMergeContext<T> mergeContext)
{
_mergeContext = mergeContext;
}
@ -29,7 +28,8 @@ namespace ShardingCore.Sharding.StreamMergeEngines
#if !EFCORE2
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
{
var asyncEnumerator = new EnumeratorShardingQueryExecutor<TShardingDbContext,T>(_mergeContext).ExecuteAsync(cancellationToken)
cancellationToken.ThrowIfCancellationRequested();
var asyncEnumerator = EnumeratorStreamMergeEngineFactory<TShardingDbContext,T>.Create(_mergeContext).GetMergeEngine()
.GetAsyncEnumerator(cancellationToken);
if (_mergeContext.IsUseShardingTrack(typeof(T)))
@ -44,7 +44,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
#if EFCORE2
IAsyncEnumerator<T> IAsyncEnumerable<T>.GetEnumerator()
{
var asyncEnumerator = ((IAsyncEnumerable<T>)new EnumeratorShardingQueryExecutor<TShardingDbContext,T>(_mergeContext).ExecuteAsync())
var asyncEnumerator = ((IAsyncEnumerable<T>)EnumeratorStreamMergeEngineFactory<TShardingDbContext,T>.Create(_mergeContext).GetMergeEngine())
.GetEnumerator();
if (_mergeContext.IsUseShardingTrack(typeof(T)))
{
@ -57,7 +57,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
public IEnumerator<T> GetEnumerator()
{
var enumerator = ((IEnumerable<T>)new EnumeratorShardingQueryExecutor<TShardingDbContext,T>(_mergeContext).ExecuteAsync())
var enumerator = ((IEnumerable<T>)EnumeratorStreamMergeEngineFactory<TShardingDbContext,T>.Create(_mergeContext).GetMergeEngine())
.GetEnumerator();
if (_mergeContext.IsUseShardingTrack(typeof(T)))

View File

@ -11,8 +11,8 @@ using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
@ -24,7 +24,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class AppenOrderSequenceEnumeratorAsyncStreamMergeEngine<TShardingDbContext, TEntity> : AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
public class AppenOrderSequenceEnumeratorAsyncStreamMergeEngine<TShardingDbContext, TEntity> : AbstractEnumeratorStreamMergeEngine<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
{
private readonly PaginationSequenceConfig _dataSourceSequenceOrderConfig;
@ -37,8 +37,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
_routeQueryResults = routeQueryResults;
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async,CancellationToken cancellationToken=new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake();
var skip = StreamMergeContext.Skip.GetValueOrDefault();
if (skip < 0)
@ -104,7 +105,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
var enumeratorTasks = sequenceResults.Select(sequenceResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(sequenceResult.DSName, noPaginationQueryable, sequenceResult, reSetOrders);
return AsyncQueryEnumerator(newQueryable, async);
return AsyncParallelQueryEnumerator(newQueryable, async, cancellationToken);
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();
@ -119,7 +120,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
return newQueryable;
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);

View File

@ -1,5 +1,5 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
@ -7,9 +7,9 @@ using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
/*
* @Author: xjm
@ -18,15 +18,16 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class DefaultShardingEnumeratorAsyncStreamMergeEngine<TShardingDbContext,TEntity> :AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
public class DefaultShardingEnumeratorAsyncStreamMergeEngine<TShardingDbContext,TEntity> :AbstractEnumeratorStreamMergeEngine<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
{
public DefaultShardingEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async, CancellationToken cancellationToken = new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var dataSourceRouteResult = StreamMergeContext.DataSourceRouteResult;
var enumeratorTasks = dataSourceRouteResult.IntersectDataSources.SelectMany(dataSourceName =>
{
@ -34,7 +35,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
return tableRouteResults.Select(routeResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(dataSourceName, routeResult);
return AsyncQueryEnumerator(newQueryable, async);
return AsyncParallelQueryEnumerator(newQueryable, async,cancellationToken);
});
}).ToArray();
@ -51,7 +52,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
return newQueryable;
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.IsPaginationQuery())
return new PaginationStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.Internal.Visitors;
@ -10,7 +11,7 @@ using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
@ -21,7 +22,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class ReverseShardingEnumeratorAsyncStreamMergeEngine<TEntity> : AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
public class ReverseShardingEnumeratorAsyncStreamMergeEngine<TEntity> : AbstractEnumeratorStreamMergeEngine<TEntity>
{
private readonly long _total;
@ -30,8 +31,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
_total = total;
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async,CancellationToken cancellationToken=new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var noPaginationNoOrderQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake().RemoveAnyOrderBy();
var skip = StreamMergeContext.Skip.GetValueOrDefault();
var take = StreamMergeContext.Take.HasValue?StreamMergeContext.Take.Value:(_total-skip);
@ -49,7 +51,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
return StreamMergeContext.TableRouteResults.Select(routeResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(dataSourceName, reverseOrderQueryable, routeResult);
return AsyncQueryEnumerator(newQueryable, async);
return AsyncParallelQueryEnumerator(newQueryable, async,cancellationToken);
});
}).ToArray();;
@ -65,7 +67,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
return newQueryable;
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
var doGetStreamMergeAsyncEnumerator = DoGetStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
return new InMemoryReverseStreamMergeAsyncEnumerator<TEntity>(doGetStreamMergeAsyncEnumerator);

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
@ -9,8 +10,8 @@ using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Base;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
@ -22,7 +23,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class SequenceEnumeratorAsyncStreamMergeEngine<TShardingDbContext,TEntity> : AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
public class SequenceEnumeratorAsyncStreamMergeEngine<TShardingDbContext,TEntity> : AbstractEnumeratorStreamMergeEngine<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
{
private readonly PaginationSequenceConfig _dataSourceSequenceMatchOrderConfig;
@ -37,8 +38,9 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
_isAsc = isAsc;
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async,CancellationToken cancellationToken=new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var noPaginationQueryable = StreamMergeContext.GetOriginalQueryable().RemoveSkip().RemoveTake();
var skip = StreamMergeContext.Skip.GetValueOrDefault();
if (skip < 0)
@ -90,7 +92,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
var enumeratorTasks = sequenceResults.Select(sequenceResult =>
{
var newQueryable = CreateAsyncExecuteQueryable(sequenceResult.DSName, noPaginationQueryable, sequenceResult);
return AsyncQueryEnumerator(newQueryable, async);
return AsyncParallelQueryEnumerator(newQueryable, async,cancellationToken);
}).ToArray();
var streamEnumerators = Task.WhenAll(enumeratorTasks).WaitAndUnwrapException();
@ -105,7 +107,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
return newQueryable;
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (StreamMergeContext.HasGroupQuery())
return new MultiAggregateOrderStreamMergeAsyncEnumerator<TEntity>(StreamMergeContext, streamsAsyncEnumerators);

View File

@ -1,10 +1,11 @@
using System.Linq;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync
{
@ -14,33 +15,34 @@ namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.
* @Date: Thursday, 02 September 2021 20:58:10
* @Email: 326308290@qq.com
*/
public class SingleQueryEnumeratorAsyncStreamMergeEngine<TShardingDbContext, TEntity> : AbstractEnumeratorAsyncStreamMergeEngine<TEntity>
public class SingleQueryEnumeratorAsyncStreamMergeEngine<TShardingDbContext, TEntity> : AbstractEnumeratorStreamMergeEngine<TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
{
public SingleQueryEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
}
public override IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async)
public override IStreamMergeAsyncEnumerator<TEntity>[] GetRouteQueryStreamMergeAsyncEnumerators(bool async,CancellationToken cancellationToken=new CancellationToken())
{
cancellationToken.ThrowIfCancellationRequested();
var dataSourceName = StreamMergeContext.DataSourceRouteResult.IntersectDataSources.First();
var routeResult = StreamMergeContext.TableRouteResults.First();
var shardingDbContext = StreamMergeContext.CreateDbContext(dataSourceName,routeResult);
var newQueryable = (IQueryable<TEntity>) StreamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext);
if (async)
{
var asyncEnumerator = DoGetAsyncEnumerator(newQueryable).WaitAndUnwrapException();
var asyncEnumerator = GetAsyncEnumerator0(newQueryable).WaitAndUnwrapException();
return new[] { new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator) };
}
else
{
var enumerator = DoGetEnumerator(newQueryable);
var enumerator = GetEnumerator0(newQueryable);
return new[] { new StreamMergeAsyncEnumerator<TEntity>(enumerator) };
}
}
public override IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
public override IStreamMergeAsyncEnumerator<TEntity> CombineStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators)
{
if (streamsAsyncEnumerators.Length != 1)
throw new ShardingCoreException($"{nameof(SingleQueryEnumeratorAsyncStreamMergeEngine<TShardingDbContext,TEntity>)} has more {nameof(IStreamMergeAsyncEnumerator<TEntity>)}");

View File

@ -1,11 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using ShardingCore.Core.Internal.Visitors;
using ShardingCore.Core.ShardingPage.Abstractions;
using ShardingCore.Core.VirtualDatabase.VirtualDataSources;
@ -16,12 +13,12 @@ using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Extensions.InternalExtensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.StreamMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync;
using ShardingCore.Sharding.PaginationConfigurations;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.EnumeratorAsync;
namespace ShardingCore.Sharding.ShardingQueryExecutors
namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines
{
/*
* @Author: xjm
@ -30,24 +27,28 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public class EnumeratorShardingQueryExecutor<TShardingDbContext, TEntity>
public class EnumeratorStreamMergeEngineFactory<TShardingDbContext, TEntity>
where TShardingDbContext : DbContext, IShardingDbContext
{
private readonly StreamMergeContext<TEntity> _streamMergeContext;
private readonly IShardingPageManager _shardingPageManager;
private readonly IVirtualTableManager<TShardingDbContext> _virtualTableManager;
private readonly IVirtualDataSource<TShardingDbContext> _virtualDataSource;
public EnumeratorShardingQueryExecutor(StreamMergeContext<TEntity> streamMergeContext)
private EnumeratorStreamMergeEngineFactory(StreamMergeContext<TEntity> streamMergeContext)
{
_streamMergeContext = streamMergeContext;
_shardingPageManager = ShardingContainer.GetService<IShardingPageManager>();
_virtualTableManager = ShardingContainer.GetService<IVirtualTableManager<TShardingDbContext>>();
_virtualDataSource = ShardingContainer.GetService<IVirtualDataSource<TShardingDbContext>>();
}
public IEnumeratorStreamMergeEngine<TEntity> ExecuteAsync(CancellationToken cancellationToken = new CancellationToken())
public static EnumeratorStreamMergeEngineFactory<TShardingDbContext, TEntity> Create(StreamMergeContext<TEntity> streamMergeContext)
{
return new EnumeratorStreamMergeEngineFactory<TShardingDbContext, TEntity>(streamMergeContext);
}
public IEnumeratorStreamMergeEngine<TEntity> GetMergeEngine()
{
cancellationToken.ThrowIfCancellationRequested();
//本次查询没有跨库没有跨表就可以直接执行
if (!_streamMergeContext.IsCrossDataSource&&!_streamMergeContext.IsCrossTable)
{

View File

@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -12,8 +12,7 @@ using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -12,8 +12,7 @@ using ShardingCore.Exceptions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractEnsureExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractEnsureMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -9,8 +9,7 @@ using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Helpers;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions.AbstractGenericExpressionMergeEngines;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge.AbstractGenericMergeEngines;
namespace ShardingCore.Sharding.StreamMergeEngines
{

View File

@ -9,8 +9,9 @@ using Microsoft.EntityFrameworkCore.Infrastructure;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using ShardingCore.Sharding.MergeEngines.Abstractions.InMemoryMerge;
using ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines;
using ShardingCore.Sharding.StreamMergeEngines.Abstractions;
using ShardingCore.Sharding.StreamMergeEngines.AggregateMergeEngines;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Internal;
@ -46,7 +47,7 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
public TResult ExecuteAsync<TResult>(ICurrentDbContext currentContext, Expression query,CancellationToken cancellationToken = new CancellationToken())
public TResult ExecuteAsync<TResult>(ICurrentDbContext currentContext, Expression query, CancellationToken cancellationToken = new CancellationToken())
{
var currentDbContext = currentContext.Context;
if (currentDbContext is IShardingDbContext shardingDbContext)
@ -70,7 +71,7 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
throw new ShardingCoreException("db context operator is not IShardingDbContext");
}
private TResult DoExecute<TResult>(IShardingDbContext shardingDbContext, Expression query, bool async, CancellationToken cancellationToken = new CancellationToken())
private TResult DoExecute<TResult>(IShardingDbContext shardingDbContext, Expression query, bool async, CancellationToken cancellationToken = new CancellationToken())
{
if (query is MethodCallExpression methodCallExpression)
@ -132,13 +133,13 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
// private readonly IStreamMergeContextFactory _streamMergeContextFactory;
var streamMergeContextMethod = streamMergeContextFactory.GetType().GetMethod("Create");
var streamMergeContextMethod = streamMergeContextFactory.GetType().GetMethod("Create");
if (streamMergeContextMethod == null)
throw new ShardingCoreException("cant found IStreamMergeContextFactory method [Create]");
var streamMergeContext = streamMergeContextMethod.MakeGenericMethod(new Type[] { queryEntityType }).Invoke(streamMergeContextFactory, new[] { queryable, shardingDbContext });
Type streamMergeEngineType = typeof(AsyncEnumerableStreamMergeEngine<,>);
Type streamMergeEngineType = typeof(AsyncEnumeratorStreamMergeEngine<,>);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(shardingDbContext.GetType(), queryEntityType);
return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
}
@ -148,7 +149,7 @@ namespace ShardingCore.Sharding.ShardingQueryExecutors
{
var queryEntityType = query.GetQueryEntityType();
var resultEntityType = query.GetResultType();
streamMergeEngineType = streamMergeEngineType.MakeGenericType(shardingDbContext.GetType(),queryEntityType);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(shardingDbContext.GetType(), queryEntityType);
var streamEngine = Activator.CreateInstance(streamMergeEngineType, query, shardingDbContext);
var methodName = async ? nameof(IGenericMergeResult.MergeResultAsync) : nameof(IGenericMergeResult.MergeResult);
var streamEngineMethod = streamMergeEngineType.GetMethod(methodName);

View File

@ -1,100 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Exceptions;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
using ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x;
#if EFCORE2
using Microsoft.EntityFrameworkCore.Extensions.Internal;
#endif
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 15:38:05
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractEnumeratorAsyncStreamMergeEngine<TEntity> : AbstractEnumeratorStreamMergeEngine<TEntity>
{
public AbstractEnumeratorAsyncStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext) : base(streamMergeContext)
{
}
public abstract IStreamMergeAsyncEnumerator<TEntity>[] GetDbStreamMergeAsyncEnumerators(bool async);
public abstract IStreamMergeAsyncEnumerator<TEntity> GetStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<TEntity>[] streamsAsyncEnumerators);
public Task<StreamMergeAsyncEnumerator<TEntity>> AsyncQueryEnumerator(IQueryable<TEntity> queryable, bool async)
{
return Task.Run(async () =>
{
try
{
if (async)
{
var asyncEnumerator = await DoGetAsyncEnumerator(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(asyncEnumerator);
}
else
{
var enumerator = DoGetEnumerator(queryable);
return new StreamMergeAsyncEnumerator<TEntity>(enumerator);
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}
public async Task<IAsyncEnumerator<TEntity>> DoGetAsyncEnumerator(IQueryable<TEntity> newQueryable)
{
#if !EFCORE2
var enumator = newQueryable.AsAsyncEnumerable().GetAsyncEnumerator();
await enumator.MoveNextAsync();
return enumator;
#endif
#if EFCORE2
var enumator = new EFCore2TryCurrentAsyncEnumerator<TEntity>(newQueryable.AsAsyncEnumerable().GetEnumerator());
await enumator.MoveNext();
return enumator;
#endif
}
public IEnumerator<TEntity> DoGetEnumerator(IQueryable<TEntity> newQueryable)
{
var enumator = newQueryable.AsEnumerable().GetEnumerator();
enumator.MoveNext();
return enumator;
}
// public virtual IQueryable<TEntity> CreateAsyncExecuteQueryable(RouteResult routeResult)
// {
// var shardingDbContext = StreamMergeContext.CreateDbContext(routeResult);
// var useOriginal = StreamMergeContext > 1;
// DbContextQueryStore.TryAdd(routeResult,shardingDbContext);
// var newQueryable = (IQueryable<TEntity>)(useOriginal ? StreamMergeContext.GetReWriteQueryable() : StreamMergeContext.GetOriginalQueryable())
// .ReplaceDbContextQueryable(shardingDbContext);
// return newQueryable;
// }
public override IStreamMergeAsyncEnumerator<TEntity> GetShardingAsyncEnumerator(bool async,
CancellationToken cancellationToken = new CancellationToken())
{
var dbStreamMergeAsyncEnumerators = GetDbStreamMergeAsyncEnumerators(async);
if (dbStreamMergeAsyncEnumerators.IsEmpty())
throw new ShardingCoreException("GetDbStreamMergeAsyncEnumerators empty");
return GetStreamMergeAsyncEnumerator(dbStreamMergeAsyncEnumerators);
}
}
}

View File

@ -1,67 +0,0 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using ShardingCore.Core.VirtualRoutes.TableRoutes.RoutingRuleEngine;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Enumerators;
namespace ShardingCore.Sharding.StreamMergeEngines.EnumeratorStreamMergeEngines.Abstractions
{
/*
* @Author: xjm
* @Description:
* @Date: 2021/9/2 15:35:39
* @Ver: 1.0
* @Email: 326308290@qq.com
*/
public abstract class AbstractEnumeratorStreamMergeEngine<TEntity> : IEnumeratorStreamMergeEngine<TEntity>
{
public StreamMergeContext<TEntity> StreamMergeContext { get; }
public AbstractEnumeratorStreamMergeEngine(StreamMergeContext<TEntity> streamMergeContext)
{
StreamMergeContext = streamMergeContext;
}
public abstract IStreamMergeAsyncEnumerator<TEntity> GetShardingAsyncEnumerator(bool async,
CancellationToken cancellationToken = new CancellationToken());
#if !EFCORE2
public IAsyncEnumerator<TEntity> GetAsyncEnumerator(
CancellationToken cancellationToken = new CancellationToken())
{
return GetShardingAsyncEnumerator(true,cancellationToken);
}
#endif
#if EFCORE2
IAsyncEnumerator<TEntity> IAsyncEnumerable<TEntity>.GetEnumerator()
{
return GetShardingAsyncEnumerator(true);
}
#endif
public IEnumerator<TEntity> GetEnumerator()
{
return GetShardingAsyncEnumerator(false);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public void Dispose()
{
StreamMergeContext.Dispose();
}
}
}