Spring Cloud LoadBalancer是Spring Cloud官方自己提供的客户端负载均衡器,用来替代Netflix Ribbon。它是一个客户端层的负载均衡器,用于发现、更新和维护服务列表,并自定义服务的均衡负载策略,如随机、轮询、小流量的金丝雀等等。Spring Cloud LoadBalancer提供了自己的客户端负载平衡器抽象和实现,增加了ReactiveLoadBalancer接口,并提供了基于round-robin轮询和Random随机的实现。
官方文档:spring-cloud-loadbalancer
Demo实现
创建客户端应用,并设置服务端口7900,并创建一个rest接口用于调用服务
@RestController
@Slf4j
@RequestMapping("/hello")
public class HelloController {
@Autowired
WebClient.Builder webClientBuilder;
@Autowired
RestTemplate restTemplate;
@GetMapping ("/webclient")
public ResponseEntity webclient(HttpServletRequest request) {
WebClient loadBalancedClient = webClientBuilder.build();
List<String> resp = new ArrayList<>();
for(int i = 1; i <= 10; i++) {
String response =
loadBalancedClient.get().uri("http://instance-server/hello")
.attribute("sessionId",request.getParameter("sessionId"))
.retrieve().toEntity(String.class)
.block().getBody();
resp.add(response);
}
return new ResponseEntity<>(resp,HttpStatusCode.valueOf(200));
}
@GetMapping ("/rest")
public ResponseEntity rest(HttpServletRequest request) {
WebClient loadBalancedClient = webClientBuilder.build();
List<String> resp = new ArrayList<>();
for(int i = 1; i <= 10; i++) {
String response = restTemplate.getForObject("http://instance-server/hello", String.class);
resp.add(response);
}
return new ResponseEntity<>(resp,HttpStatusCode.valueOf(200));
}
}
依赖引入
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
ServiceInstanceListSupplier 实现,构建服务实列列表
class DemoInstanceSupplier implements ServiceInstanceListSupplier {
private final String serviceId;
public DemoInstanceSupplier(String serviceId) {
this.serviceId = serviceId;
}
@Override
public String getServiceId() {
return serviceId;
}
@Override
public Flux<List<ServiceInstance>> get() {
return Flux.just(Arrays
.asList(new DefaultServiceInstance(serviceId + "7901", serviceId, "localhost", 7901, false),
new DefaultServiceInstance(serviceId + "7902", serviceId, "localhost", 7902, false)));
}
}
//---------------------------------------------
@Configuration
public class DemoServerInstanceConfiguration {
@Bean
ServiceInstanceListSupplier serviceInstanceListSupplier() {
return new DemoInstanceSupplier("instance-server");
}
}
从nacos上获取服务实例
class NacosInstanceSupplier implements ServiceInstanceListSupplier {
private final String serviceId;
private final DiscoveryClient discoveryClient;
public NacosInstanceSupplier(String serviceId, DiscoveryClient discoveryClient) {
this.serviceId = serviceId;
this.discoveryClient = discoveryClient;
}
@Override
public String getServiceId() {
return serviceId;
}
@Override
public Flux<List<ServiceInstance>> get() {
List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceId);
return Flux.just(serviceInstances);
}
}
负载均衡bean配置
@Configuration
@LoadBalancerClient(name = "instance-server", configuration = DemoServerInstanceConfiguration.class)
public class LoadBanlanceConfig {
@LoadBalanced
@Bean
WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = "instance-server";
return new ZBRoundRobinLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}
这里使用自定义负载配置,其他默认配置类有如下类,包括nacos,随机负载均衡
ZBRoundRobinLoadBalancer 实现
这里如果请求中没有sessionId时,维护一个原子整数。相当于轮询。
如果有SessionId,sessionid相同的请求都会路由到同一个服务上。
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, Request request) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + this.serviceId);
}
return new EmptyResponse();
} else {
DefaultRequestContext requestContext = (DefaultRequestContext) request.getContext();
RequestData clientRequest = (RequestData) requestContext.getClientRequest();
String sessionId = (String) clientRequest.getAttributes().get("sessionId");
ServiceInstance instance = null;
if (sessionId == null) {
int pos = this.position.incrementAndGet() & 2147483647;
instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
} else {
if (instanceSession.get(sessionId) != null) {
instance = instances.stream().filter(f -> f.getInstanceId().equals(instanceSession.get(sessionId))).findFirst().get();
} else {
instance= instances.get(this.position.incrementAndGet()%instances.size());
instanceSession.put(sessionId, instance.getInstanceId());
}
}
return new DefaultResponse(instance);
}
}
创建并启动springboot服务instance-server-one 和instance-server-two 端口分别为7901和7902
提供一个基本rest接口,表明该服务被调用
@RestController
@Slf4j
public class HelloController {
@Value("${server.port}")
String port;
@Autowired
WebClient.Builder webClientBuilder;
@RequestMapping("/hello")
public String hello(HttpServletRequest request)
{
return "response from instance-server:"+port;
}
}
访问localhost:7900/hello/webclient ,会进行轮询路由不同的实例
如果代sessionId 则相同的sessionId请求都会路由到同一个服务实列, 如访问 localhost:7900/hello/webclient?sessionId=aa 会全部负载到7901,localhost:7900/hello/webclient?sessionId=bb会负载到7902
原理分析
DefaultWebClientBuilder
ReactorLoadBalancerClientAutoConfiguration
ReactorLoadBalancerExchangeFilterFunction
LoadBalancerWebClientBuilderBeanPostProcessor