From e487d5e21cf3e834c332e0582c72f6f1298fc7d4 Mon Sep 17 00:00:00 2001 From: xuejiaming <326308290@qq.com> Date: Wed, 13 Sep 2023 13:54:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=88=86=E7=89=87=E5=B1=9E?= =?UTF-8?q?=E6=80=A7=E7=BB=A7=E6=89=BF=E5=AF=BC=E8=87=B4=E7=9A=84=E4=B8=8D?= =?UTF-8?q?=E8=B5=B0=E9=AB=98=E6=80=A7=E8=83=BD=E5=88=86=E9=A1=B5=E7=9A=84?= =?UTF-8?q?bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Controllers/AsyncBatchService.cs | 20 ++++ .../Controllers/WeatherForecastController.cs | 27 +++++- .../DbContexts/DefaultShardingDbContext.cs | 9 ++ samples/Sample.MySql/MyJob.cs | 39 ++++++++ .../Shardings/SysUserModVirtualTableRoute.cs | 12 +++ samples/Sample.MySql/Startup.cs | 23 +++-- .../EnumeratorStreamMergeEngineFactory.cs | 92 +++++++++++++------ 7 files changed, 181 insertions(+), 41 deletions(-) create mode 100644 samples/Sample.MySql/Controllers/AsyncBatchService.cs create mode 100644 samples/Sample.MySql/MyJob.cs diff --git a/samples/Sample.MySql/Controllers/AsyncBatchService.cs b/samples/Sample.MySql/Controllers/AsyncBatchService.cs new file mode 100644 index 00000000..9bc6581e --- /dev/null +++ b/samples/Sample.MySql/Controllers/AsyncBatchService.cs @@ -0,0 +1,20 @@ +using Sample.MySql.DbContexts; + +namespace Sample.MySql.Controllers; + +public class AsyncBatchService +{ + private readonly DefaultShardingDbContext _dbContext; + + public AsyncBatchService(DefaultShardingDbContext dbContext) + { + _dbContext = dbContext; + } + + + public void SaveInOtherThread(List list) + { + _dbContext.AddRangeAsync(list); + _dbContext.SaveChanges(); + } +} \ No newline at end of file diff --git a/samples/Sample.MySql/Controllers/WeatherForecastController.cs b/samples/Sample.MySql/Controllers/WeatherForecastController.cs index 2b0e7247..8f2d3966 100644 --- a/samples/Sample.MySql/Controllers/WeatherForecastController.cs +++ b/samples/Sample.MySql/Controllers/WeatherForecastController.cs @@ -61,13 +61,15 @@ namespace Sample.MySql.Controllers [Route("[controller]/[action]")] public class WeatherForecastController : ControllerBase { + private readonly IServiceProvider _serviceProvider; private readonly UnShardingDbContext _unShardingDbContext; private readonly DefaultShardingDbContext _defaultTableDbContext; private readonly IShardingRuntimeContext _shardingRuntimeContext; private readonly ABC _abc; - public WeatherForecastController(UnShardingDbContext unShardingDbContext,DefaultShardingDbContext defaultTableDbContext,IShardingRuntimeContext shardingRuntimeContext) + public WeatherForecastController(IServiceProvider serviceProvider,UnShardingDbContext unShardingDbContext,DefaultShardingDbContext defaultTableDbContext,IShardingRuntimeContext shardingRuntimeContext) { + _serviceProvider = serviceProvider; _unShardingDbContext = unShardingDbContext; _defaultTableDbContext = defaultTableDbContext; _shardingRuntimeContext = shardingRuntimeContext; @@ -126,7 +128,7 @@ namespace Sample.MySql.Controllers // // } var dateTime = new DateTime(2021,1,1); - var x211 = await (from ut in _defaultTableDbContext.Set() + var x211 = await (from ut in _defaultTableDbContext.Set().UseUnionAllMerge() join uu in _defaultTableDbContext.Set() on ut.Id equals uu.Id where uu.Time > dateTime @@ -218,6 +220,10 @@ namespace Sample.MySql.Controllers .Select(o => new ssss(){ Id = o.Id, C = GetAll().Count(x => x.Id == o.Id) }).ToList(); var sysTests = GetAll(); var sysUserMods3 = _defaultTableDbContext.Set() + // .AsRoute(op=> + // { + // op.TryCreateOrAddMustTail(new[] { "00" }); + // }) .Select(o => new ssss(){ Id = o.Id, C = sysTests.Count(x => x.Id == o.Id) }).ToList(); var resultX = await _defaultTableDbContext.Set() .Where(o => o.Id == "2" || o.Id == "3").FirstOrDefaultAsync(); @@ -446,5 +452,22 @@ namespace Sample.MySql.Controllers // var sysUserMods2 = await _defaultTableDbContext.Set().FromSqlRaw("select * from SysTest where id='2'").ToListAsync(); return Ok(); } + + + // public void batachSave() + // { + // var objects = new List>(); + // foreach (List o in objects) + // { + // Task.Run(()=> + // { + // using (var serviceScope = _serviceProvider.CreateScope()) + // { + // var asyncBatchService = serviceScope.ServiceProvider.GetService(); + // asyncBatchService.SaveInOtherThread(o); + // } + // }); + // } + // } } } diff --git a/samples/Sample.MySql/DbContexts/DefaultShardingDbContext.cs b/samples/Sample.MySql/DbContexts/DefaultShardingDbContext.cs index 110c2ab3..b5e3e3ed 100644 --- a/samples/Sample.MySql/DbContexts/DefaultShardingDbContext.cs +++ b/samples/Sample.MySql/DbContexts/DefaultShardingDbContext.cs @@ -23,6 +23,15 @@ namespace Sample.MySql.DbContexts public DefaultShardingDbContext(DbContextOptions options) : base(options) { + var key = options.Extensions + .OrderBy(e => e.GetType().Name) + .Select(o => + { + Console.WriteLine(o.GetType().Name); + return o; + }) + .Aggregate(0L, (t, e) => (t * 397) ^ ((long)e.GetType().GetHashCode() * 397) ^ e.Info.GetServiceProviderHashCode()); + Console.WriteLine("key:"+key); //切记不要在构造函数中使用会让模型提前创建的方法 //ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; //Database.SetCommandTimeout(30000); diff --git a/samples/Sample.MySql/MyJob.cs b/samples/Sample.MySql/MyJob.cs new file mode 100644 index 00000000..007b4e20 --- /dev/null +++ b/samples/Sample.MySql/MyJob.cs @@ -0,0 +1,39 @@ +using Microsoft.EntityFrameworkCore; +using Sample.MySql.DbContexts; +using ShardingCore; +using ShardingCore.Core.RuntimeContexts; + +namespace Sample.MySql; + +public class MyJob :IHostedService +{ + private readonly IServiceProvider _serviceProvider; + private readonly IShardingRuntimeContext _shardingRuntimeContext; + + public MyJob(IServiceProvider serviceProvider,IShardingRuntimeContext shardingRuntimeContext) + { + _serviceProvider = serviceProvider; + _shardingRuntimeContext = shardingRuntimeContext; + } + public Task StartAsync(CancellationToken cancellationToken) + { + // using (var serviceScope = _serviceProvider.CreateScope()) + // { + // var defaultShardingDbContext = serviceScope.ServiceProvider.GetService(); + // } + // + // var dbContextOptionsBuilder = new DbContextOptionsBuilder(); + // dbContextOptionsBuilder.UseSharding(_shardingRuntimeContext); + // using (var dbcontext = new DefaultShardingDbContext(dbContextOptionsBuilder.Options)) + // { + // + // } + + throw new NotImplementedException(); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/samples/Sample.MySql/Shardings/SysUserModVirtualTableRoute.cs b/samples/Sample.MySql/Shardings/SysUserModVirtualTableRoute.cs index 2c2657df..561971b9 100644 --- a/samples/Sample.MySql/Shardings/SysUserModVirtualTableRoute.cs +++ b/samples/Sample.MySql/Shardings/SysUserModVirtualTableRoute.cs @@ -1,5 +1,7 @@ using Sample.MySql.Domain.Entities; using ShardingCore.Core.EntityMetadatas; +using ShardingCore.Core.VirtualRoutes; +using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine; using ShardingCore.VirtualRoutes.Mods; namespace Sample.MySql.Shardings @@ -16,10 +18,20 @@ namespace Sample.MySql.Shardings { } + public override void Configure(EntityMetadataTableBuilder builder) { builder.ShardingProperty(o => o.Id); } + // protected override List AfterShardingRouteUnitFilter(DataSourceRouteResult dataSourceRouteResult, List shardingRouteUnits) + // { + // //拦截 + // if (shardingRouteUnits.Count > 10) + // { + // return shardingRouteUnits.Take(10).ToList(); + // } + // return base.AfterShardingRouteUnitFilter(dataSourceRouteResult, shardingRouteUnits); + // } } } \ No newline at end of file diff --git a/samples/Sample.MySql/Startup.cs b/samples/Sample.MySql/Startup.cs index 2e8a9524..ad5314e2 100644 --- a/samples/Sample.MySql/Startup.cs +++ b/samples/Sample.MySql/Startup.cs @@ -130,24 +130,27 @@ namespace Sample.MySql // o.UseEntityFrameworkCoreProxies = true; o.ThrowIfQueryRouteNotMatch = false; o.AutoUseWriteConnectionStringAfterWriteDb = true; - + o.UseExecutorDbContextConfigure(op => + { + + }); o.UseShardingQuery((conStr, builder) => { - var logger = sp.ApplicationServiceProvider.GetService>(); - logger.LogInformation(conStr); - builder.UseMySql(conStr, new MySqlServerVersion(new Version())) + // var logger = sp.ApplicationServiceProvider.GetService>(); + // logger.LogInformation(conStr); + builder.UseMySql(conStr, new MySqlServerVersion(new Version())); // .UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking) - .UseLoggerFactory(loggerFactory1) - .EnableSensitiveDataLogging(); + // .UseLoggerFactory(loggerFactory1) + // .EnableSensitiveDataLogging(); //.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking); }); o.UseShardingTransaction((connection, builder) => { builder - .UseMySql(connection, new MySqlServerVersion(new Version())) - // .UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking) - .UseLoggerFactory(loggerFactory1) - .EnableSensitiveDataLogging(); + .UseMySql(connection, new MySqlServerVersion(new Version())); + // .UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking) + // .UseLoggerFactory(loggerFactory1) + // .EnableSensitiveDataLogging(); //.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking); }); o.AddDefaultDataSource("ds0", diff --git a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorStreamMergeEngineFactory.cs b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorStreamMergeEngineFactory.cs index 2fd65aff..6a8689f3 100644 --- a/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorStreamMergeEngineFactory.cs +++ b/src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/EnumeratorStreamMergeEngineFactory.cs @@ -34,11 +34,12 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines private readonly ITableRouteManager _tableRouteManager; private readonly IEntityMetadataManager _entityMetadataManager; private readonly IDataSourceRouteManager _dataSourceRouteManager; + private EnumeratorStreamMergeEngineFactory(StreamMergeContext streamMergeContext) { _streamMergeContext = streamMergeContext; _shardingPageManager = streamMergeContext.ShardingRuntimeContext.GetShardingPageManager(); - _tableRouteManager =streamMergeContext.ShardingRuntimeContext.GetTableRouteManager(); + _tableRouteManager = streamMergeContext.ShardingRuntimeContext.GetTableRouteManager(); _entityMetadataManager = streamMergeContext.ShardingRuntimeContext.GetEntityMetadataManager(); _dataSourceRouteManager = streamMergeContext.ShardingRuntimeContext.GetDataSourceRouteManager(); } @@ -52,6 +53,7 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines { return _dataSourceRouteManager.GetRoute(entityType); } + public IStreamEnumerable GetStreamEnumerable() { if (_streamMergeContext.IsRouteNotMatch()) @@ -84,7 +86,8 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines } //未开启系统分表或者本次查询涉及多张分表 - if (_streamMergeContext.IsPaginationQuery() && _streamMergeContext.IsSingleShardingEntityQuery() && _shardingPageManager.Current != null) + if (_streamMergeContext.IsPaginationQuery() && _streamMergeContext.IsSingleShardingEntityQuery() && + _shardingPageManager.Current != null) { //获取虚拟表判断是否启用了分页配置 var shardingEntityType = _streamMergeContext.GetSingleShardingEntityType(); @@ -114,7 +117,6 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines private IStreamEnumerable DoNoOrderAppendEnumeratorStreamMergeEngine(Type shardingEntityType) { - var isShardingDataSource = _entityMetadataManager.IsShardingDataSource(shardingEntityType); var isShardingTable = _entityMetadataManager.IsShardingTable(shardingEntityType); PaginationSequenceConfig dataSourceSequenceOrderConfig = null; @@ -124,28 +126,36 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines var virtualDataSourceRoute = GetRoute(shardingEntityType); if (virtualDataSourceRoute.EnablePagination) { - dataSourceSequenceOrderConfig = virtualDataSourceRoute.PaginationMetadata.PaginationConfigs.OrderByDescending(o => o.AppendOrder) - .FirstOrDefault(o => o.AppendIfOrderNone && typeof(TEntity).ContainPropertyName(o.PropertyName)); + dataSourceSequenceOrderConfig = virtualDataSourceRoute.PaginationMetadata.PaginationConfigs + .OrderByDescending(o => o.AppendOrder) + .FirstOrDefault(o => + o.AppendIfOrderNone && typeof(TEntity).ContainPropertyName(o.PropertyName)); } - } + if (isShardingTable) { var tableRoute = _tableRouteManager.GetRoute(shardingEntityType); if (tableRoute.EnablePagination) { - tableSequenceOrderConfig = tableRoute.PaginationMetadata.PaginationConfigs.OrderByDescending(o => o.AppendOrder) - .FirstOrDefault(o => o.AppendIfOrderNone && typeof(TEntity).ContainPropertyName(o.PropertyName)); + tableSequenceOrderConfig = tableRoute.PaginationMetadata.PaginationConfigs + .OrderByDescending(o => o.AppendOrder) + .FirstOrDefault(o => + o.AppendIfOrderNone && typeof(TEntity).ContainPropertyName(o.PropertyName)); } } var useSequenceEnumeratorMergeEngine = isShardingDataSource && (dataSourceSequenceOrderConfig != null || (isShardingTable && - !_streamMergeContext.IsCrossDataSource)) || (!isShardingDataSource && isShardingTable && tableSequenceOrderConfig != null); + !_streamMergeContext.IsCrossDataSource)) || + (!isShardingDataSource && isShardingTable && + tableSequenceOrderConfig != null); if (useSequenceEnumeratorMergeEngine) { - return new AppendOrderSequenceShardingEnumerable(_streamMergeContext, dataSourceSequenceOrderConfig, tableSequenceOrderConfig, _shardingPageManager.Current.RouteQueryResults); + return new AppendOrderSequenceShardingEnumerable(_streamMergeContext, + dataSourceSequenceOrderConfig, tableSequenceOrderConfig, + _shardingPageManager.Current.RouteQueryResults); } @@ -154,7 +164,6 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines private IStreamEnumerable DoOrderSequencePaginationEnumeratorStreamMergeEngine(Type shardingEntityType) { - var orderCount = _streamMergeContext.Orders.Count(); var primaryOrder = _streamMergeContext.Orders.First(); var isShardingDataSource = _entityMetadataManager.IsShardingDataSource(shardingEntityType); @@ -170,33 +179,44 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines virtualDataSourceRoute = GetRoute(shardingEntityType); if (virtualDataSourceRoute.EnablePagination) { - dataSourceSequenceOrderConfig = orderCount == 1 ? GetPaginationFullMatch(virtualDataSourceRoute.PaginationMetadata.PaginationConfigs, primaryOrder) : GetPaginationPrimaryMatch(virtualDataSourceRoute.PaginationMetadata.PaginationConfigs, primaryOrder); + dataSourceSequenceOrderConfig = orderCount == 1 + ? GetPaginationFullMatch(virtualDataSourceRoute.PaginationMetadata.PaginationConfigs, + primaryOrder) + : GetPaginationPrimaryMatch(virtualDataSourceRoute.PaginationMetadata.PaginationConfigs, + primaryOrder); } - } + if (isShardingTable) { tableRoute = _tableRouteManager.GetRoute(shardingEntityType); if (tableRoute.EnablePagination) { - tableSequenceOrderConfig = orderCount == 1 ? GetPaginationFullMatch(tableRoute.PaginationMetadata.PaginationConfigs, primaryOrder) : GetPaginationPrimaryMatch(tableRoute.PaginationMetadata.PaginationConfigs, primaryOrder); + tableSequenceOrderConfig = orderCount == 1 + ? GetPaginationFullMatch(tableRoute.PaginationMetadata.PaginationConfigs, primaryOrder) + : GetPaginationPrimaryMatch(tableRoute.PaginationMetadata.PaginationConfigs, primaryOrder); } } var useSequenceEnumeratorMergeEngine = isShardingDataSource && (dataSourceSequenceOrderConfig != null || (isShardingTable && - !_streamMergeContext.IsCrossDataSource)) || (!isShardingDataSource && isShardingTable && tableSequenceOrderConfig != null); + !_streamMergeContext.IsCrossDataSource)) || + (!isShardingDataSource && isShardingTable && + tableSequenceOrderConfig != null); if (useSequenceEnumeratorMergeEngine) { - return new SequenceShardingEnumerable(_streamMergeContext, dataSourceSequenceOrderConfig, tableSequenceOrderConfig, _shardingPageManager.Current.RouteQueryResults, primaryOrder.IsAsc); + return new SequenceShardingEnumerable(_streamMergeContext, dataSourceSequenceOrderConfig, + tableSequenceOrderConfig, _shardingPageManager.Current.RouteQueryResults, primaryOrder.IsAsc); } var total = _shardingPageManager.Current.RouteQueryResults.Sum(o => o.QueryResult); if (isShardingDataSource) { dataSourceUseReverse = - virtualDataSourceRoute.EnablePagination && EntityDataSourceUseReverseShardingPage(virtualDataSourceRoute, total); + virtualDataSourceRoute.EnablePagination && + EntityDataSourceUseReverseShardingPage(virtualDataSourceRoute, total); } + if (isShardingTable) { tableUseReverse = @@ -211,51 +231,65 @@ namespace ShardingCore.Sharding.MergeEngines.EnumeratorStreamMergeEngines } - - - return null; } private bool EntityDataSourceUseReverseShardingPage(IVirtualDataSourceRoute virtualDataSourceRoute, long total) { - if (virtualDataSourceRoute.PaginationMetadata.EnableReverseShardingPage && _streamMergeContext.Take.GetValueOrDefault() > 0) + if (virtualDataSourceRoute.PaginationMetadata.EnableReverseShardingPage && + _streamMergeContext.Take.GetValueOrDefault() > 0) { - if (virtualDataSourceRoute.PaginationMetadata.IsUseReverse(_streamMergeContext.Skip.GetValueOrDefault(), total)) + if (virtualDataSourceRoute.PaginationMetadata.IsUseReverse(_streamMergeContext.Skip.GetValueOrDefault(), + total)) { return true; } } + return false; } + private bool EntityTableReverseShardingPage(IVirtualTableRoute tableRoute, long total) { - if (tableRoute.PaginationMetadata.EnableReverseShardingPage && _streamMergeContext.Take.GetValueOrDefault() > 0) + if (tableRoute.PaginationMetadata.EnableReverseShardingPage && + _streamMergeContext.Take.GetValueOrDefault() > 0) { if (tableRoute.PaginationMetadata.IsUseReverse(_streamMergeContext.Skip.GetValueOrDefault(), total)) { return true; } } + return false; } - private PaginationSequenceConfig GetPaginationFullMatch(ISet paginationSequenceConfigs, PropertyOrder primaryOrder) + private PaginationSequenceConfig GetPaginationFullMatch( + ISet paginationSequenceConfigs, PropertyOrder primaryOrder) { return paginationSequenceConfigs.FirstOrDefault(o => PaginationPrimaryMatch(o, primaryOrder)); } - private PaginationSequenceConfig GetPaginationPrimaryMatch(ISet paginationSequenceConfigs, PropertyOrder primaryOrder) + + private PaginationSequenceConfig GetPaginationPrimaryMatch( + ISet paginationSequenceConfigs, PropertyOrder primaryOrder) { - return paginationSequenceConfigs.Where(o => o.PaginationMatchEnum.HasFlag(PaginationMatchEnum.PrimaryMatch)).FirstOrDefault(o => PaginationPrimaryMatch(o, primaryOrder)); + return paginationSequenceConfigs.Where(o => o.PaginationMatchEnum.HasFlag(PaginationMatchEnum.PrimaryMatch)) + .FirstOrDefault(o => PaginationPrimaryMatch(o, primaryOrder)); } - private bool PaginationPrimaryMatch(PaginationSequenceConfig paginationSequenceConfig, PropertyOrder propertyOrder) + private bool PaginationPrimaryMatch(PaginationSequenceConfig paginationSequenceConfig, + PropertyOrder propertyOrder) { if (propertyOrder.PropertyExpression != paginationSequenceConfig.PropertyName) return false; - if (paginationSequenceConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Owner)) - return _streamMergeContext.GetSingleShardingEntityType() == paginationSequenceConfig.OrderPropertyInfo.DeclaringType; + { + var singleShardingEntityType = _streamMergeContext.GetSingleShardingEntityType(); + return paginationSequenceConfig.OrderPropertyInfo.DeclaringType != null && + (paginationSequenceConfig.OrderPropertyInfo.DeclaringType == singleShardingEntityType || + paginationSequenceConfig.OrderPropertyInfo.DeclaringType.IsAssignableFrom( + singleShardingEntityType)); + } + if (paginationSequenceConfig.PaginationMatchEnum.HasFlag(PaginationMatchEnum.Named)) return propertyOrder.PropertyExpression == paginationSequenceConfig.PropertyName; return false;