序
本文主要研究一下spring cloud的AbstractLoadBalancingClient
AbstractLoadBalancingClient
spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/support/AbstractLoadBalancingClient.java
/** * @author Spencer Gibb */public abstract class AbstractLoadBalancingClientextends AbstractLoadBalancerAwareClientimplements ServiceInstanceChooser { protected int connectTimeout; protected int readTimeout; protected boolean secure; protected boolean followRedirects; protected boolean okToRetryOnAllOperations; protected final D delegate; protected final IClientConfig config; protected final ServerIntrospector serverIntrospector; public boolean isClientRetryable(ContextAwareRequest request) { return false; } protected AbstractLoadBalancingClient(IClientConfig config, ServerIntrospector serverIntrospector) { super(null); this.delegate = createDelegate(config); this.config = config; this.serverIntrospector = serverIntrospector; this.setRetryHandler(RetryHandler.DEFAULT); initWithNiwsConfig(config); } protected AbstractLoadBalancingClient(D delegate, IClientConfig config, ServerIntrospector serverIntrospector) { super(null); this.delegate = delegate; this.config = config; this.serverIntrospector = serverIntrospector; this.setRetryHandler(RetryHandler.DEFAULT); initWithNiwsConfig(config); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { super.initWithNiwsConfig(clientConfig); RibbonProperties ribbon = RibbonProperties.from(clientConfig); this.connectTimeout = ribbon.connectTimeout(DEFAULT_CONNECT_TIMEOUT); this.readTimeout = ribbon.readTimeout(DEFAULT_READ_TIMEOUT); this.secure = ribbon.isSecure(); this.followRedirects = ribbon.isFollowRedirects(); this.okToRetryOnAllOperations = ribbon.isOkToRetryOnAllOperations(); } protected abstract D createDelegate(IClientConfig config); public D getDelegate() { return this.delegate; } @Override public RequestSpecificRetryHandler getRequestSpecificRetryHandler( final S request, final IClientConfig requestConfig) { if (this.okToRetryOnAllOperations) { return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig); } if (!request.getContext().getMethod().equals("GET")) { return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(), requestConfig); } else { return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig); } } protected boolean isSecure(final IClientConfig config) { if(config != null) { return RibbonProperties.from(config).isSecure(this.secure); } return this.secure; } @Override protected void customizeLoadBalancerCommandBuilder(S request, IClientConfig config, LoadBalancerCommand.Builderbuilder) { if (request.getLoadBalancerKey() != null) { builder.withServerLocator(request.getLoadBalancerKey()); } } @Override public ServiceInstance choose(String serviceId) { Server server = this.getLoadBalancer().chooseServer(serviceId); if (server != null) { return new RibbonLoadBalancerClient.RibbonServer(serviceId, server); } return null; } public void validateServiceInstance(ServiceInstance serviceInstance) throws ClientException { if (serviceInstance == null) { throw new ClientException("Load balancer does not have available server for client: " + clientName); } else if (serviceInstance.getHost() == null) { throw new ClientException("Invalid Server for: " + serviceInstance.getServiceId() + " null Host"); } }}
- AbstractLoadBalancingClient继承了AbstractLoadBalancerAwareClient,实现了ServiceInstanceChooser接口
- 它有两个基本的实现,分别是使用apache httpclient的RibbonLoadBalancingHttpClient,以及使用okhttp的OkHttpLoadBalancingClient
RibbonLoadBalancingHttpClient
spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/apache/RibbonLoadBalancingHttpClient.java
/** * @author Christian Lohmann * @author Ryan Baxter * @author Tim Ysewyn */// TODO: rename (ie new class that extends this in Dalston) to ApacheHttpLoadBalancingClientpublic class RibbonLoadBalancingHttpClient extends AbstractLoadBalancingClient{ public RibbonLoadBalancingHttpClient(IClientConfig config, ServerIntrospector serverIntrospector) { super(config, serverIntrospector); } public RibbonLoadBalancingHttpClient(CloseableHttpClient delegate, IClientConfig config, ServerIntrospector serverIntrospector) { super(delegate, config, serverIntrospector); } protected CloseableHttpClient createDelegate(IClientConfig config) { RibbonProperties ribbon = RibbonProperties.from(config); return HttpClientBuilder.create() // already defaults to 0 in builder, so resetting to 0 won't hurt .setMaxConnTotal(ribbon.maxTotalConnections(0)) // already defaults to 0 in builder, so resetting to 0 won't hurt .setMaxConnPerRoute(ribbon.maxConnectionsPerHost(0)) .disableCookieManagement().useSystemProperties() // for proxy .build(); } @Override public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { IClientConfig config = configOverride != null ? configOverride : this.config; RibbonProperties ribbon = RibbonProperties.from(config); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(ribbon.connectTimeout(this.connectTimeout)) .setSocketTimeout(ribbon.readTimeout(this.readTimeout)) .setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects)) .build(); request = getSecureRequest(request, configOverride); final HttpUriRequest httpUriRequest = request.toRequest(requestConfig); final HttpResponse httpResponse = this.delegate.execute(httpUriRequest); return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI()); } @Override public URI reconstructURIWithServer(Server server, URI original) { URI uri = updateToSecureConnectionIfNeeded(original, this.config, this.serverIntrospector, server); return super.reconstructURIWithServer(server, uri); } @Override public RequestSpecificRetryHandler getRequestSpecificRetryHandler( RibbonApacheHttpRequest request, IClientConfig requestConfig) { return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, requestConfig); } protected RibbonApacheHttpRequest getSecureRequest(RibbonApacheHttpRequest request, IClientConfig configOverride) { if (isSecure(configOverride)) { final URI secureUri = UriComponentsBuilder.fromUri(request.getUri()) .scheme("https").build(true).toUri(); return request.withNewUri(secureUri); } return request; }}
- 这个类被命名为ApacheHttpLoadBalancingClient更好一点
- execute方法将RibbonApacheHttpRequest转换为HttpUriRequest,然后使用apache httpclient进行请求
OkHttpLoadBalancingClient
spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/okhttp/OkHttpLoadBalancingClient.java
/** * @author Spencer Gibb * @author Ryan Baxter * @author Tim Ysewyn */public class OkHttpLoadBalancingClient extends AbstractLoadBalancingClient{ public OkHttpLoadBalancingClient(IClientConfig config, ServerIntrospector serverIntrospector) { super(config, serverIntrospector); } public OkHttpLoadBalancingClient(OkHttpClient delegate, IClientConfig config, ServerIntrospector serverIntrospector) { super(delegate, config, serverIntrospector); } @Override protected OkHttpClient createDelegate(IClientConfig config) { return new OkHttpClient(); } @Override public OkHttpRibbonResponse execute(OkHttpRibbonRequest ribbonRequest, final IClientConfig configOverride) throws Exception { boolean secure = isSecure(configOverride); if (secure) { final URI secureUri = UriComponentsBuilder.fromUri(ribbonRequest.getUri()) .scheme("https").build().toUri(); ribbonRequest = ribbonRequest.withNewUri(secureUri); } OkHttpClient httpClient = getOkHttpClient(configOverride, secure); final Request request = ribbonRequest.toRequest(); Response response = httpClient.newCall(request).execute(); return new OkHttpRibbonResponse(response, ribbonRequest.getUri()); } OkHttpClient getOkHttpClient(IClientConfig configOverride, boolean secure) { IClientConfig config = configOverride != null ? configOverride : this.config; RibbonProperties ribbon = RibbonProperties.from(config); OkHttpClient.Builder builder = this.delegate.newBuilder() .connectTimeout(ribbon.connectTimeout(this.connectTimeout), TimeUnit.MILLISECONDS) .readTimeout(ribbon.readTimeout(this.readTimeout), TimeUnit.MILLISECONDS) .followRedirects(ribbon.isFollowRedirects(this.followRedirects)); if (secure) { builder.followSslRedirects(ribbon.isFollowRedirects(this.followRedirects)); } return builder.build(); } @Override public URI reconstructURIWithServer(Server server, URI original) { URI uri = updateToSecureConnectionIfNeeded(original, this.config, this.serverIntrospector, server); return super.reconstructURIWithServer(server, uri); }}
- OkHttpLoadBalancingClient里头根据IClientConfig构建OkHttpClient,然后再将响应包装为OkHttpRibbonResponse
配置
HttpClientRibbonConfiguration
spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/apache/HttpClientRibbonConfiguration.java
@Configuration@ConditionalOnClass(name = "org.apache.http.client.HttpClient")@ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true)public class HttpClientRibbonConfiguration { @RibbonClientName private String name = "client"; @Configuration protected static class ApacheHttpClientConfiguration { private final Timer connectionManagerTimer = new Timer( "RibbonApacheHttpClientConfiguration.connectionManagerTimer", true); private CloseableHttpClient httpClient; @Autowired(required = false) private RegistryBuilder registryBuilder; @Bean @ConditionalOnMissingBean(HttpClientConnectionManager.class) public HttpClientConnectionManager httpClientConnectionManager( IClientConfig config, ApacheHttpClientConnectionManagerFactory connectionManagerFactory) { RibbonProperties ribbon = RibbonProperties.from(config); int maxTotalConnections = ribbon.maxTotalConnections(); int maxConnectionsPerHost = ribbon.maxConnectionsPerHost(); int timerRepeat = ribbon.connectionCleanerRepeatInterval(); long timeToLive = ribbon.poolKeepAliveTime(); TimeUnit ttlUnit = ribbon.getPoolKeepAliveTimeUnits(); final HttpClientConnectionManager connectionManager = connectionManagerFactory .newConnectionManager(false, maxTotalConnections, maxConnectionsPerHost, timeToLive, ttlUnit, registryBuilder); this.connectionManagerTimer.schedule(new TimerTask() { @Override public void run() { connectionManager.closeExpiredConnections(); } }, 30000, timerRepeat); return connectionManager; } @Bean @ConditionalOnMissingBean(CloseableHttpClient.class) public CloseableHttpClient httpClient(ApacheHttpClientFactory httpClientFactory, HttpClientConnectionManager connectionManager, IClientConfig config) { RibbonProperties ribbon = RibbonProperties.from(config); Boolean followRedirects = ribbon.isFollowRedirects(); Integer connectTimeout = ribbon.connectTimeout(); RequestConfig defaultRequestConfig = RequestConfig.custom() .setConnectTimeout(connectTimeout) .setRedirectsEnabled(followRedirects).build(); this.httpClient = httpClientFactory.createBuilder(). setDefaultRequestConfig(defaultRequestConfig). setConnectionManager(connectionManager).build(); return httpClient; } @PreDestroy public void destroy() throws Exception { connectionManagerTimer.cancel(); if(httpClient != null) { httpClient.close(); } } } @Bean @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class) @ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate") public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler, CloseableHttpClient httpClient) { RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(httpClient, config, serverIntrospector); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); Monitors.registerObject("Client_" + this.name, client); return client; } @Bean @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class) @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate") public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler, LoadBalancedRetryFactory loadBalancedRetryFactory, CloseableHttpClient httpClient) { RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient( httpClient, config, serverIntrospector, loadBalancedRetryFactory); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); Monitors.registerObject("Client_" + this.name, client); return client; }}
- 配置的是RibbonLoadBalancingHttpClient,如果有retry,则是RetryableRibbonLoadBalancingHttpClient
OkHttpRibbonConfiguration
spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/okhttp/OkHttpRibbonConfiguration.java
@Configuration@ConditionalOnProperty("ribbon.okhttp.enabled")@ConditionalOnClass(name = "okhttp3.OkHttpClient")public class OkHttpRibbonConfiguration { @RibbonClientName private String name = "client"; @Configuration protected static class OkHttpClientConfiguration { private OkHttpClient httpClient; @Bean @ConditionalOnMissingBean(ConnectionPool.class) public ConnectionPool httpClientConnectionPool(IClientConfig config, OkHttpClientConnectionPoolFactory connectionPoolFactory) { RibbonProperties ribbon = RibbonProperties.from(config); int maxTotalConnections = ribbon.maxTotalConnections(); long timeToLive = ribbon.poolKeepAliveTime(); TimeUnit ttlUnit = ribbon.getPoolKeepAliveTimeUnits(); return connectionPoolFactory.create(maxTotalConnections, timeToLive, ttlUnit); } @Bean @ConditionalOnMissingBean(OkHttpClient.class) public OkHttpClient client(OkHttpClientFactory httpClientFactory, ConnectionPool connectionPool, IClientConfig config) { RibbonProperties ribbon = RibbonProperties.from(config); this.httpClient = httpClientFactory.createBuilder(false) .connectTimeout(ribbon.connectTimeout(), TimeUnit.MILLISECONDS) .readTimeout(ribbon.readTimeout(), TimeUnit.MILLISECONDS) .followRedirects(ribbon.isFollowRedirects()) .connectionPool(connectionPool) .build(); return this.httpClient; } @PreDestroy public void destroy() { if(httpClient != null) { httpClient.dispatcher().executorService().shutdown(); httpClient.connectionPool().evictAll(); } } } @Bean @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class) @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate") public RetryableOkHttpLoadBalancingClient retryableOkHttpLoadBalancingClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler, LoadBalancedRetryFactory loadBalancedRetryFactory, OkHttpClient delegate) { RetryableOkHttpLoadBalancingClient client = new RetryableOkHttpLoadBalancingClient(delegate, config, serverIntrospector, loadBalancedRetryFactory); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); Monitors.registerObject("Client_" + this.name, client); return client; } @Bean @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class) @ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate") public OkHttpLoadBalancingClient okHttpLoadBalancingClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler, OkHttpClient delegate) { OkHttpLoadBalancingClient client = new OkHttpLoadBalancingClient(delegate, config, serverIntrospector); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); Monitors.registerObject("Client_" + this.name, client); return client; }}
- 配置的是OkHttpLoadBalancingClient,如果有retry,则是RetryableOkHttpLoadBalancingClient
小结
AbstractLoadBalancingClient有appche httpclient以及okhttp两类实现,分别是RibbonLoadBalancingHttpClient及OkHttpLoadBalancingClient;每种实现都有retry相关的实现,分别是RetryableRibbonLoadBalancingHttpClient、RetryableOkHttpLoadBalancingClient。