This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch opensearch-upgrade in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5ccbc1a029e307e37fe071961e885a59d974b6e7 Author: Rene Cordier <[email protected]> AuthorDate: Tue Jul 18 17:12:50 2023 +0700 JAMES-3929 Upgrade to opensearch-java 2.6.0 -> Refactor code to use http5 and drop the rest client --- backends-common/opensearch/pom.xml | 19 +++-- .../james/backends/opensearch/ClientProvider.java | 94 +++++++++++++--------- .../opensearch/ReactorOpenSearchClient.java | 11 +-- 3 files changed, 69 insertions(+), 55 deletions(-) diff --git a/backends-common/opensearch/pom.xml b/backends-common/opensearch/pom.xml index 55782b45bc..4240e99d79 100644 --- a/backends-common/opensearch/pom.xml +++ b/backends-common/opensearch/pom.xml @@ -71,6 +71,18 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-configuration2</artifactId> </dependency> + <!-- Needed for opensearch-java dependency --> + <dependency> + <groupId>org.apache.httpcomponents.client5</groupId> + <artifactId>httpclient5</artifactId> + <version>5.1.4</version> + </dependency> + <!-- Needed for opensearch-java dependency --> + <dependency> + <groupId>org.apache.httpcomponents.core5</groupId> + <artifactId>httpcore5</artifactId> + <version>5.1.5</version> + </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> @@ -83,12 +95,7 @@ <dependency> <groupId>org.opensearch.client</groupId> <artifactId>opensearch-java</artifactId> - <version>2.1.0</version> - </dependency> - <dependency> - <groupId>org.opensearch.client</groupId> - <artifactId>opensearch-rest-client</artifactId> - <version>2.4.0</version> + <version>2.6.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java index d253fc6d6b..b6bccdb588 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java @@ -25,6 +25,7 @@ import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.time.Duration; import java.time.LocalDateTime; +import java.util.concurrent.TimeUnit; import javax.annotation.PreDestroy; import javax.inject.Inject; @@ -34,21 +35,24 @@ import javax.net.ssl.SSLContext; import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.time.DurationFormatUtils; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.conn.ssl.DefaultHostnameVerifier; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; -import org.apache.http.ssl.SSLContextBuilder; -import org.apache.http.ssl.TrustStrategy; -import org.apache.james.backends.opensearch.json.jackson.JacksonJsonpMapper; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.CredentialsStore; +import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; +import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.client5.http.ssl.DefaultHostnameVerifier; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.ssl.SSLContextBuilder; +import org.apache.hc.core5.ssl.TrustStrategy; import org.apache.james.util.concurrent.NamedThreadFactory; -import org.opensearch.client.RestClient; import org.opensearch.client.opensearch.OpenSearchAsyncClient; -import org.opensearch.client.transport.rest_client.RestClientTransport; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +63,7 @@ import reactor.util.retry.Retry; public class ClientProvider implements Provider<ReactorOpenSearchClient> { private static class HttpAsyncClientConfigurer { - + private static final AuthScope ANY = new AuthScope(null, null, -1, null, null); private static final TrustStrategy TRUST_ALL = (x509Certificates, authType) -> true; private static final HostnameVerifier ACCEPT_ANY_HOSTNAME = (hostname, sslSession) -> true; @@ -74,9 +78,6 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> { configureHostScheme(builder); configureTimeout(builder); - configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal); - configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute); - builder.setThreadFactory(NamedThreadFactory.withName("OpenSearch-driver")); return builder; @@ -98,19 +99,35 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> { } private void configureSSLOptions(HttpAsyncClientBuilder builder) { - try { - builder - .setSSLContext(sslContext()) - .setSSLHostnameVerifier(hostnameVerifier()); - } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) { - throw new RuntimeException("Cannot set SSL options to the builder", e); - } + builder.setConnectionManager(connectionManager()); } private void configureTimeout(HttpAsyncClientBuilder builder) { builder.setDefaultRequestConfig(requestConfig()); } + private PoolingAsyncClientConnectionManager connectionManager() { + PoolingAsyncClientConnectionManagerBuilder builder = PoolingAsyncClientConnectionManagerBuilder + .create() + .setTlsStrategy(tlsStrategy()); + + configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal); + configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute); + + return builder.build(); + } + + private TlsStrategy tlsStrategy() { + try { + return ClientTlsStrategyBuilder.create() + .setSslContext(sslContext()) + .setHostnameVerifier(hostnameVerifier()) + .build(); + } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) { + throw new RuntimeException("Cannot set SSL options to the builder", e); + } + } + private SSLContext sslContext() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException, CertificateException, IOException { @@ -151,9 +168,8 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> { private RequestConfig requestConfig() { return RequestConfig.custom() - .setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis())) - .setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis())) - .setSocketTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis())) + .setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS) + .setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS) .build(); } @@ -171,9 +187,9 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> { private void configureAuthentication(HttpAsyncClientBuilder builder) { configuration.getCredential() .ifPresent(credential -> { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(credential.getUsername(), String.valueOf(credential.getPassword()))); + CredentialsStore credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(ANY, + new UsernamePasswordCredentials(credential.getUsername(), credential.getPassword())); builder.setDefaultCredentialsProvider(credentialsProvider); }); } @@ -182,7 +198,7 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> { private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class); private final OpenSearchConfiguration configuration; - private final RestClient lowLevelRestClient; + private final OpenSearchTransport openSearchTransport; private final OpenSearchAsyncClient openSearchClient; private final HttpAsyncClientConfigurer httpAsyncClientConfigurer; private final ReactorOpenSearchClient client; @@ -191,13 +207,13 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> { public ClientProvider(OpenSearchConfiguration configuration) { this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration); this.configuration = configuration; - this.lowLevelRestClient = buildRestClient(); + this.openSearchTransport = buildTransport(); this.openSearchClient = connect(); - this.client = new ReactorOpenSearchClient(this.openSearchClient, lowLevelRestClient); + this.client = new ReactorOpenSearchClient(this.openSearchClient); } - private RestClient buildRestClient() { - return RestClient.builder(hostsToHttpHosts()) + private OpenSearchTransport buildTransport() { + return ApacheHttpClient5TransportBuilder.builder(hostsToHttpHosts()) .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure) .build(); } @@ -217,14 +233,12 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> { private OpenSearchAsyncClient connectToCluster() { LOGGER.info("Trying to connect to OpenSearch service at {}", LocalDateTime.now()); - RestClientTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper()); - - return new OpenSearchAsyncClient(transport); + return new OpenSearchAsyncClient(openSearchTransport); } private HttpHost[] hostsToHttpHosts() { return configuration.getHosts().stream() - .map(host -> new HttpHost(host.getHostName(), host.getPort(), configuration.getHostScheme().name())) + .map(host -> new HttpHost(configuration.getHostScheme().name(), host.getHostName(), host.getPort())) .toArray(HttpHost[]::new); } @@ -235,6 +249,6 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> { @PreDestroy public void close() throws IOException { - lowLevelRestClient.close(); + openSearchTransport.close(); } } diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java index 05e41ecebe..4b722d3af2 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java @@ -22,7 +22,6 @@ package org.apache.james.backends.opensearch; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import org.opensearch.client.RestClient; import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch.cluster.HealthRequest; import org.opensearch.client.opensearch.cluster.HealthResponse; @@ -58,11 +57,9 @@ import reactor.core.scheduler.Schedulers; public class ReactorOpenSearchClient implements AutoCloseable { private final OpenSearchAsyncClient client; - private final RestClient lowLevelRestClient; - public ReactorOpenSearchClient(OpenSearchAsyncClient client, RestClient lowLevelRestClient) { + public ReactorOpenSearchClient(OpenSearchAsyncClient client) { this.client = client; - this.lowLevelRestClient = lowLevelRestClient; } public Mono<BulkResponse> bulk(BulkRequest bulkRequest) throws IOException { @@ -81,10 +78,6 @@ public class ReactorOpenSearchClient implements AutoCloseable { return toReactor(client.deleteByQuery(deleteRequest)); } - public RestClient getLowLevelClient() { - return lowLevelRestClient; - } - public <T> Mono<IndexResponse> index(IndexRequest<T> indexRequest) throws IOException { return toReactor(client.index(indexRequest)); } @@ -127,7 +120,7 @@ public class ReactorOpenSearchClient implements AutoCloseable { @Override public void close() throws IOException { - lowLevelRestClient.close(); + client._transport().close(); } private static <T> Mono<T> toReactor(CompletableFuture<T> async) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
