1. gateway 增加灰度路由
This commit is contained in:
parent
78c200ca61
commit
f32e43863b
|
@ -0,0 +1,75 @@
|
|||
package cn.iocoder.yudao.gateway.filter.grey;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils;
|
||||
import com.alibaba.cloud.nacos.balancer.NacosBalancer;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
import org.springframework.cloud.client.loadbalancer.*;
|
||||
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
|
||||
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
|
||||
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 灰度 {@link GrayLoadBalancer} 实现类
|
||||
*
|
||||
* 根据请求的 header[version] 匹配,筛选满足 metadata[version] 相等的服务实例列表,然后随机 + 权重进行选择一个
|
||||
* 1. 假如请求的 header[version] 为空,则不进行筛选,所有服务实例都进行选择
|
||||
* 2. 如果 metadata[version] 都不相等,则不进行筛选,所有服务实例都进行选择
|
||||
*
|
||||
* 注意,考虑到实现的简易,它的权重是使用 Nacos 的 nacos.weight,所以随机 + 权重也是基于 {@link NacosBalancer} 筛选。
|
||||
* 也就是说,如果你不使用 Nacos 作为注册中心,需要微调一下筛选的实现逻辑
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class GrayLoadBalancer implements ReactorServiceInstanceLoadBalancer {
|
||||
|
||||
private static final String VERSION = "version";
|
||||
|
||||
private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider; // 用于获取 serviceId 对应的服务实例的列表
|
||||
private final String serviceId; // 服务名,暂时用于打印 logger 日志
|
||||
|
||||
@Override
|
||||
public Mono<Response<ServiceInstance>> choose(Request request) {
|
||||
// 获得 HttpHeaders 属性,实现从 header 中获取 version
|
||||
HttpHeaders headers = ((RequestDataContext) request.getContext()).getClientRequest().getHeaders();
|
||||
// 选择实例
|
||||
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
|
||||
return supplier.get(request).next().map(list -> getInstanceResponse(list, headers));
|
||||
}
|
||||
|
||||
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, HttpHeaders headers) {
|
||||
// 如果服务实例为空,则直接返回
|
||||
if (CollUtil.isEmpty(instances)) {
|
||||
if (log.isWarnEnabled()) {
|
||||
log.warn("[getInstanceResponse][serviceId({}) 服务实例列表为空]", serviceId);
|
||||
}
|
||||
return new EmptyResponse();
|
||||
}
|
||||
|
||||
// 筛选满足条件的实例列表
|
||||
String version = headers.getFirst(VERSION);
|
||||
List<ServiceInstance> chooseInstances;
|
||||
if (StrUtil.isEmpty(version)) {
|
||||
chooseInstances = instances;
|
||||
} else {
|
||||
chooseInstances = CollectionUtils.filterList(instances, instance -> version.equals(instance.getMetadata().get("version")));
|
||||
if (CollUtil.isEmpty(chooseInstances)) {
|
||||
log.warn("[getInstanceResponse][serviceId({}) 没有满足版本的服务实例列表,直接使用所有服务实例列表]", serviceId);
|
||||
chooseInstances = instances;
|
||||
}
|
||||
}
|
||||
|
||||
// 随机 + 权重获取实例列表 TODO 芋艿:目前直接使用 Nacos 提供的方法,如果替换注册中心,需要重新失败该方法
|
||||
return new DefaultResponse(NacosBalancer.getHostByRandomWeight3(chooseInstances));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,138 @@
|
|||
package cn.iocoder.yudao.gateway.filter.grey;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
import org.springframework.cloud.client.loadbalancer.*;
|
||||
import org.springframework.cloud.gateway.config.GatewayLoadBalancerProperties;
|
||||
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
|
||||
import org.springframework.cloud.gateway.filter.GlobalFilter;
|
||||
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
|
||||
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
|
||||
import org.springframework.cloud.gateway.support.NotFoundException;
|
||||
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
|
||||
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*;
|
||||
|
||||
/**
|
||||
* 支持灰度功能的 {@link ReactiveLoadBalancerClientFilter} 实现类
|
||||
*
|
||||
* 由于 {@link ReactiveLoadBalancerClientFilter#choose(Request, String, Set)} 是 private 方法,无法进行重写。
|
||||
* 因此,这里只好 copy 它所有的代码,手动重写 choose 方法
|
||||
*
|
||||
* 具体的使用与实现原理,可阅读如下两个文章:
|
||||
* 1. https://www.jianshu.com/p/6db15bc0be8f
|
||||
* 2. https://cloud.tencent.com/developer/article/1620795
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Component
|
||||
@AllArgsConstructor
|
||||
@Slf4j
|
||||
@SuppressWarnings({"JavadocReference", "rawtypes", "unchecked", "ConstantConditions"})
|
||||
public class GrayReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
|
||||
|
||||
private final LoadBalancerClientFactory clientFactory;
|
||||
|
||||
private final GatewayLoadBalancerProperties properties;
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return ReactiveLoadBalancerClientFilter.LOAD_BALANCER_CLIENT_FILTER_ORDER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
|
||||
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
|
||||
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
|
||||
// 修改 by 芋道源码:将 lb 替换成 grayLb,表示灰度负载均衡
|
||||
if (url == null || (!"grayLb".equals(url.getScheme()) && !"grayLb".equals(schemePrefix))) {
|
||||
return chain.filter(exchange);
|
||||
}
|
||||
// preserve the original url
|
||||
addOriginalRequestUrl(exchange, url);
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
|
||||
}
|
||||
|
||||
URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
|
||||
String serviceId = requestUri.getHost();
|
||||
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
|
||||
.getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
|
||||
RequestDataContext.class, ResponseData.class, ServiceInstance.class);
|
||||
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
|
||||
new RequestDataContext(new RequestData(exchange.getRequest()), getHint(serviceId)));
|
||||
return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {
|
||||
|
||||
if (!response.hasServer()) {
|
||||
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
|
||||
.onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));
|
||||
throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
|
||||
}
|
||||
|
||||
ServiceInstance retrievedInstance = response.getServer();
|
||||
|
||||
URI uri = exchange.getRequest().getURI();
|
||||
|
||||
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
|
||||
// if the loadbalancer doesn't provide one.
|
||||
String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
|
||||
if (schemePrefix != null) {
|
||||
overrideScheme = url.getScheme();
|
||||
}
|
||||
|
||||
DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance,
|
||||
overrideScheme);
|
||||
|
||||
URI requestUrl = reconstructURI(serviceInstance, uri);
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
|
||||
}
|
||||
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
|
||||
exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
|
||||
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
|
||||
}).then(chain.filter(exchange))
|
||||
.doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
|
||||
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
|
||||
CompletionContext.Status.FAILED, throwable, lbRequest,
|
||||
exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
|
||||
.doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
|
||||
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
|
||||
CompletionContext.Status.SUCCESS, lbRequest,
|
||||
exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),
|
||||
new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
|
||||
}
|
||||
|
||||
protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
|
||||
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
|
||||
}
|
||||
|
||||
private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId,
|
||||
Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
|
||||
// 修改 by 芋道源码:直接创建 GrayLoadBalancer 对象
|
||||
GrayLoadBalancer loadBalancer = new GrayLoadBalancer(
|
||||
clientFactory.getLazyProvider(serviceId, ServiceInstanceListSupplier.class), serviceId);
|
||||
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
|
||||
return loadBalancer.choose(lbRequest);
|
||||
}
|
||||
|
||||
private String getHint(String serviceId) {
|
||||
LoadBalancerProperties loadBalancerProperties = clientFactory.getProperties(serviceId);
|
||||
Map<String, String> hints = loadBalancerProperties.getHint();
|
||||
String defaultHint = hints.getOrDefault("default", "default");
|
||||
String hintPropertyValue = hints.get(serviceId);
|
||||
return hintPropertyValue != null ? hintPropertyValue : defaultHint;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package cn.iocoder.yudao.gateway.filter;
|
||||
package cn.iocoder.yudao.gateway.filter.security;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
|
@ -5,10 +5,10 @@ spring:
|
|||
# 路由配置项,对应 RouteDefinition 数组
|
||||
routes:
|
||||
- id: system-admin-api # 路由的编号
|
||||
uri: lb://system-server
|
||||
uri: grayLb://system-server
|
||||
predicates: # 断言,作为路由的匹配条件,对应 RouteDefinition 数组
|
||||
- Path=/admin-api/system/**
|
||||
- id: system-app-api # 路由的编号
|
||||
uri: lb://system-server
|
||||
uri: grayLb://system-server
|
||||
predicates: # 断言,作为路由的匹配条件,对应 RouteDefinition 数组
|
||||
- Path=/app-api/system/**
|
||||
|
|
|
@ -6,6 +6,8 @@ spring:
|
|||
server-addr: 127.0.0.1:8848
|
||||
discovery:
|
||||
namespace: dev # 命名空间。这里使用 dev 开发环境
|
||||
metadata:
|
||||
version: 1.0.0 # 服务实例的版本号,可用于灰度发布
|
||||
|
||||
--- #################### 配置中心相关配置 ####################
|
||||
|
||||
|
|
Loading…
Reference in New Issue