[#136] 修复单个查询不可为空类型(值类型的bug)
This commit is contained in:
parent
4284b54c7d
commit
8e297e0857
|
@ -0,0 +1,46 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using ShardingCore.Extensions;
|
||||
using ShardingCore.Sharding.MergeEngines.Abstractions;
|
||||
using ShardingCore.Sharding.MergeEngines.Common;
|
||||
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
|
||||
using ShardingCore.Sharding.StreamMergeEngines;
|
||||
|
||||
namespace ShardingCore.Sharding.MergeEngines.Executors.Methods.Abstractions
|
||||
{
|
||||
/// <summary>
|
||||
///
|
||||
/// </summary>
|
||||
/// Author: xjm
|
||||
/// Created: 2022/5/7 7:45:07
|
||||
/// Email: 326308290@qq.com
|
||||
internal abstract class AbstractOneMethodExecutor<TResult> : AbstractExecutor<RouteQueryResult<TResult>>
|
||||
{
|
||||
protected AbstractOneMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
|
||||
{
|
||||
}
|
||||
|
||||
protected override async Task<ShardingMergeResult<RouteQueryResult<TResult>>> ExecuteUnitAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
var connectionMode = GetStreamMergeContext().RealConnectionMode(sqlExecutorUnit.ConnectionMode);
|
||||
var dataSourceName = sqlExecutorUnit.RouteUnit.DataSourceName;
|
||||
var routeResult = sqlExecutorUnit.RouteUnit.TableRouteResult;
|
||||
|
||||
var shardingDbContext = GetStreamMergeContext().CreateDbContext(dataSourceName, routeResult, connectionMode);
|
||||
var newQueryable = GetStreamMergeContext().GetReWriteQueryable()
|
||||
.ReplaceDbContextQueryable(shardingDbContext);
|
||||
|
||||
|
||||
var queryResult = await EFCoreQueryAsync(newQueryable, cancellationToken);
|
||||
var q=queryResult != null ? queryResult.Entity : default;
|
||||
var routeQueryResult = new RouteQueryResult<TResult>(dataSourceName, routeResult, q, queryResult != null);
|
||||
return new ShardingMergeResult<RouteQueryResult<TResult>>(shardingDbContext, routeQueryResult);
|
||||
}
|
||||
|
||||
protected abstract Task<OneMethodResult<TResult>> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken());
|
||||
}
|
||||
}
|
|
@ -18,7 +18,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
/// Author: xjm
|
||||
/// Created: 2022/5/7 8:48:12
|
||||
/// Email: 326308290@qq.com
|
||||
internal class FirstMethodExecutor<TEntity> : AbstractMethodExecutor<TEntity>
|
||||
internal class FirstMethodExecutor<TEntity> : AbstractOneMethodExecutor<TEntity>
|
||||
{
|
||||
public FirstMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
|
||||
{
|
||||
|
@ -29,9 +29,9 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
return new AnyElementCircuitBreaker(GetStreamMergeContext());
|
||||
}
|
||||
|
||||
protected override Task<TEntity> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
protected override Task<OneMethodResult<TEntity>> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return queryable.As<IQueryable<TEntity>>().FirstOrDefaultAsync(cancellationToken);
|
||||
return queryable.As<IQueryable<TEntity>>().Select(o => new OneMethodResult<TEntity>(o)).FirstOrDefaultAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
/// Author: xjm
|
||||
/// Created: 2022/5/7 8:48:12
|
||||
/// Email: 326308290@qq.com
|
||||
internal class FirstOrDefaultMethodExecutor<TEntity> : AbstractMethodExecutor<TEntity>
|
||||
internal class FirstOrDefaultMethodExecutor<TEntity> : AbstractOneMethodExecutor<TEntity>
|
||||
{
|
||||
public FirstOrDefaultMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
|
||||
{
|
||||
|
@ -29,9 +29,9 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
return new AnyElementCircuitBreaker(GetStreamMergeContext());
|
||||
}
|
||||
|
||||
protected override Task<TEntity> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
protected override Task<OneMethodResult<TEntity>> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return queryable.As<IQueryable<TEntity>>().FirstOrDefaultAsync(cancellationToken);
|
||||
return queryable.As<IQueryable<TEntity>>().Select(o=>new OneMethodResult<TEntity>(o)).FirstOrDefaultAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
/// Author: xjm
|
||||
/// Created: 2022/5/7 11:01:12
|
||||
/// Email: 326308290@qq.com
|
||||
internal class LastMethodExecutor<TEntity> : AbstractMethodExecutor<TEntity>
|
||||
internal class LastMethodExecutor<TEntity> : AbstractOneMethodExecutor<TEntity>
|
||||
{
|
||||
public LastMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
|
||||
{
|
||||
|
@ -29,9 +29,9 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
return new AnyElementCircuitBreaker(GetStreamMergeContext());
|
||||
}
|
||||
|
||||
protected override Task<TEntity> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
protected override Task<OneMethodResult<TEntity>> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return queryable.As<IQueryable<TEntity>>().LastOrDefaultAsync(cancellationToken);
|
||||
return queryable.As<IQueryable<TEntity>>().Select(o => new OneMethodResult<TEntity>(o)).LastOrDefaultAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
/// Author: xjm
|
||||
/// Created: 2022/5/7 11:01:12
|
||||
/// Email: 326308290@qq.com
|
||||
internal class LastOrDefaultMethodExecutor<TEntity> : AbstractMethodExecutor<TEntity>
|
||||
internal class LastOrDefaultMethodExecutor<TEntity> : AbstractOneMethodExecutor<TEntity>
|
||||
{
|
||||
public LastOrDefaultMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
|
||||
{
|
||||
|
@ -29,9 +29,9 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
return new AnyElementCircuitBreaker(GetStreamMergeContext());
|
||||
}
|
||||
|
||||
protected override Task<TEntity> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
protected override Task<OneMethodResult<TEntity>> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return queryable.As<IQueryable<TEntity>>().LastOrDefaultAsync(cancellationToken);
|
||||
return queryable.As<IQueryable<TEntity>>().Select(o => new OneMethodResult<TEntity>(o)).LastOrDefaultAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
/// Author: xjm
|
||||
/// Created: 2022/5/7 11:01:12
|
||||
/// Email: 326308290@qq.com
|
||||
internal class SingleMethodExecutor<TEntity> : AbstractMethodExecutor<TEntity>
|
||||
internal class SingleMethodExecutor<TEntity> : AbstractOneMethodExecutor<TEntity>
|
||||
{
|
||||
public SingleMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
|
||||
{
|
||||
|
@ -34,9 +34,9 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
return circuitBreaker;
|
||||
}
|
||||
|
||||
protected override Task<TEntity> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
protected override Task<OneMethodResult<TEntity>> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return queryable.As<IQueryable<TEntity>>().SingleOrDefaultAsync(cancellationToken);
|
||||
return queryable.As<IQueryable<TEntity>>().Select(o => new OneMethodResult<TEntity>(o)).SingleOrDefaultAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
/// Author: xjm
|
||||
/// Created: 2022/5/7 11:01:12
|
||||
/// Email: 326308290@qq.com
|
||||
internal class SingleOrDefaultMethodExecutor<TEntity> : AbstractMethodExecutor<TEntity>
|
||||
internal class SingleOrDefaultMethodExecutor<TEntity> : AbstractOneMethodExecutor<TEntity>
|
||||
{
|
||||
public SingleOrDefaultMethodExecutor(StreamMergeContext streamMergeContext) : base(streamMergeContext)
|
||||
{
|
||||
|
@ -34,9 +34,9 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Methods
|
|||
return circuitBreaker;
|
||||
}
|
||||
|
||||
protected override Task<TEntity> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
protected override Task<OneMethodResult<TEntity>> EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken())
|
||||
{
|
||||
return queryable.As<IQueryable<TEntity>>().SingleOrDefaultAsync(cancellationToken);
|
||||
return queryable.As<IQueryable<TEntity>>().Select(o => new OneMethodResult<TEntity>(o)).SingleOrDefaultAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
|
|||
|
||||
protected override TEntity DoMergeResult0(List<RouteQueryResult<TEntity>> resultList)
|
||||
{
|
||||
var notNullResult = resultList.Where(o => o != null && o.QueryResult != null).Select(o => o.QueryResult).ToList();
|
||||
var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList();
|
||||
|
||||
if (notNullResult.IsEmpty())
|
||||
return default;
|
||||
|
|
|
@ -51,7 +51,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
|
|||
|
||||
protected override TEntity DoMergeResult0(List<RouteQueryResult<TEntity>> resultList)
|
||||
{
|
||||
var notNullResult = resultList.Where(o => o != null && o.QueryResult != null).Select(o => o.QueryResult).ToList();
|
||||
var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList();
|
||||
|
||||
if (notNullResult.IsEmpty())
|
||||
throw new InvalidOperationException("Sequence contains no elements.");
|
||||
|
|
|
@ -51,7 +51,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
|
|||
|
||||
protected override TEntity DoMergeResult0(List<RouteQueryResult<TEntity>> resultList)
|
||||
{
|
||||
var notNullResult = resultList.Where(o => o != null && o.QueryResult != null).Select(o => o.QueryResult).ToList();
|
||||
var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList();
|
||||
|
||||
if (notNullResult.IsEmpty())
|
||||
return default;
|
||||
|
|
|
@ -15,6 +15,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
|
|||
}
|
||||
public class RouteQueryResult<TResult>: IRouteQueryResult
|
||||
{
|
||||
private readonly bool _hasValue;
|
||||
public string DataSourceName { get; }
|
||||
public TableRouteResult TableRouteResult { get; }
|
||||
public TResult QueryResult { get; }
|
||||
|
@ -24,11 +25,19 @@ namespace ShardingCore.Sharding.StreamMergeEngines
|
|||
DataSourceName = dataSourceName;
|
||||
TableRouteResult = tableRouteResult;
|
||||
QueryResult = queryResult;
|
||||
_hasValue = QueryResult != null;
|
||||
}
|
||||
public RouteQueryResult(string dataSourceName,TableRouteResult tableRouteResult,TResult queryResult,bool hasValue)
|
||||
{
|
||||
_hasValue = hasValue;
|
||||
DataSourceName = dataSourceName;
|
||||
TableRouteResult = tableRouteResult;
|
||||
QueryResult = queryResult;
|
||||
}
|
||||
|
||||
public bool HasQueryResult()
|
||||
{
|
||||
return QueryResult!= null;
|
||||
return _hasValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
|
|||
|
||||
protected override TEntity DoMergeResult0(List<RouteQueryResult<TEntity>> resultList)
|
||||
{
|
||||
var notNullResult = resultList.Where(o => o != null && o.QueryResult != null).Select(o => o.QueryResult).ToList();
|
||||
var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList();
|
||||
|
||||
if (notNullResult.Count == 0)
|
||||
throw new InvalidOperationException("Sequence on element.");
|
||||
|
|
|
@ -32,7 +32,7 @@ namespace ShardingCore.Sharding.StreamMergeEngines
|
|||
|
||||
protected override TEntity DoMergeResult0(List<RouteQueryResult<TEntity>> resultList)
|
||||
{
|
||||
var notNullResult = resultList.Where(o => o != null && o.QueryResult != null).Select(o => o.QueryResult).ToList();
|
||||
var notNullResult = resultList.Where(o => o != null && o.HasQueryResult()).Select(o => o.QueryResult).ToList();
|
||||
|
||||
if (notNullResult.Count > 1)
|
||||
throw new InvalidOperationException("Sequence contains more than one element.");
|
||||
|
|
Loading…
Reference in New Issue