This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch opensearch-upgrade
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5ccbc1a029e307e37fe071961e885a59d974b6e7
Author: Rene Cordier <[email protected]>
AuthorDate: Tue Jul 18 17:12:50 2023 +0700

    JAMES-3929 Upgrade to opensearch-java 2.6.0
    
    -> Refactor code to use http5 and drop the rest client
---
 backends-common/opensearch/pom.xml                 | 19 +++--
 .../james/backends/opensearch/ClientProvider.java  | 94 +++++++++++++---------
 .../opensearch/ReactorOpenSearchClient.java        | 11 +--
 3 files changed, 69 insertions(+), 55 deletions(-)

diff --git a/backends-common/opensearch/pom.xml 
b/backends-common/opensearch/pom.xml
index 55782b45bc..4240e99d79 100644
--- a/backends-common/opensearch/pom.xml
+++ b/backends-common/opensearch/pom.xml
@@ -71,6 +71,18 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-configuration2</artifactId>
         </dependency>
+        <!-- Needed for opensearch-java dependency -->
+        <dependency>
+            <groupId>org.apache.httpcomponents.client5</groupId>
+            <artifactId>httpclient5</artifactId>
+            <version>5.1.4</version>
+        </dependency>
+        <!-- Needed for opensearch-java dependency -->
+        <dependency>
+            <groupId>org.apache.httpcomponents.core5</groupId>
+            <artifactId>httpcore5</artifactId>
+            <version>5.1.5</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-to-slf4j</artifactId>
@@ -83,12 +95,7 @@
         <dependency>
             <groupId>org.opensearch.client</groupId>
             <artifactId>opensearch-java</artifactId>
-            <version>2.1.0</version>
-        </dependency>
-        <dependency>
-            <groupId>org.opensearch.client</groupId>
-            <artifactId>opensearch-rest-client</artifactId>
-            <version>2.4.0</version>
+            <version>2.6.0</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
diff --git 
a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
 
b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
index d253fc6d6b..b6bccdb588 100644
--- 
a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
+++ 
b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
@@ -25,6 +25,7 @@ import java.security.NoSuchAlgorithmException;
 import java.security.cert.CertificateException;
 import java.time.Duration;
 import java.time.LocalDateTime;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
@@ -34,21 +35,24 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.time.DurationFormatUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
-import org.apache.http.ssl.SSLContextBuilder;
-import org.apache.http.ssl.TrustStrategy;
-import org.apache.james.backends.opensearch.json.jackson.JacksonJsonpMapper;
+import org.apache.hc.client5.http.auth.AuthScope;
+import org.apache.hc.client5.http.auth.CredentialsStore;
+import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
+import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
+import 
org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
+import org.apache.hc.client5.http.ssl.DefaultHostnameVerifier;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.ssl.SSLContextBuilder;
+import org.apache.hc.core5.ssl.TrustStrategy;
 import org.apache.james.util.concurrent.NamedThreadFactory;
-import org.opensearch.client.RestClient;
 import org.opensearch.client.opensearch.OpenSearchAsyncClient;
-import org.opensearch.client.transport.rest_client.RestClientTransport;
+import org.opensearch.client.transport.OpenSearchTransport;
+import 
org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +63,7 @@ import reactor.util.retry.Retry;
 public class ClientProvider implements Provider<ReactorOpenSearchClient> {
 
     private static class HttpAsyncClientConfigurer {
-
+        private static final AuthScope ANY = new AuthScope(null, null, -1, 
null, null);
         private static final TrustStrategy TRUST_ALL = (x509Certificates, 
authType) -> true;
         private static final HostnameVerifier ACCEPT_ANY_HOSTNAME = (hostname, 
sslSession) -> true;
 
@@ -74,9 +78,6 @@ public class ClientProvider implements 
Provider<ReactorOpenSearchClient> {
             configureHostScheme(builder);
             configureTimeout(builder);
 
-            
configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal);
-            
configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute);
-
             
builder.setThreadFactory(NamedThreadFactory.withName("OpenSearch-driver"));
 
             return builder;
@@ -98,19 +99,35 @@ public class ClientProvider implements 
Provider<ReactorOpenSearchClient> {
         }
 
         private void configureSSLOptions(HttpAsyncClientBuilder builder) {
-            try {
-                builder
-                    .setSSLContext(sslContext())
-                    .setSSLHostnameVerifier(hostnameVerifier());
-            } catch (NoSuchAlgorithmException | KeyManagementException | 
KeyStoreException | CertificateException | IOException e) {
-                throw new RuntimeException("Cannot set SSL options to the 
builder", e);
-            }
+            builder.setConnectionManager(connectionManager());
         }
 
         private void configureTimeout(HttpAsyncClientBuilder builder) {
             builder.setDefaultRequestConfig(requestConfig());
         }
 
+        private PoolingAsyncClientConnectionManager connectionManager() {
+            PoolingAsyncClientConnectionManagerBuilder builder = 
PoolingAsyncClientConnectionManagerBuilder
+                .create()
+                .setTlsStrategy(tlsStrategy());
+
+            
configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal);
+            
configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute);
+
+            return builder.build();
+        }
+
+        private TlsStrategy tlsStrategy() {
+            try {
+                return ClientTlsStrategyBuilder.create()
+                    .setSslContext(sslContext())
+                    .setHostnameVerifier(hostnameVerifier())
+                    .build();
+            } catch (NoSuchAlgorithmException | KeyManagementException | 
KeyStoreException | CertificateException | IOException e) {
+                throw new RuntimeException("Cannot set SSL options to the 
builder", e);
+            }
+        }
+
         private SSLContext sslContext() throws KeyStoreException, 
NoSuchAlgorithmException, KeyManagementException,
             CertificateException, IOException {
 
@@ -151,9 +168,8 @@ public class ClientProvider implements 
Provider<ReactorOpenSearchClient> {
 
         private RequestConfig requestConfig() {
             return RequestConfig.custom()
-                    
.setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
-                    
.setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
-                    
.setSocketTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
+                    
.setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()),
 TimeUnit.MILLISECONDS)
+                    
.setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()),
 TimeUnit.MILLISECONDS)
                     .build();
         }
 
@@ -171,9 +187,9 @@ public class ClientProvider implements 
Provider<ReactorOpenSearchClient> {
         private void configureAuthentication(HttpAsyncClientBuilder builder) {
             configuration.getCredential()
                 .ifPresent(credential -> {
-                    CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
-                    credentialsProvider.setCredentials(AuthScope.ANY,
-                        new 
UsernamePasswordCredentials(credential.getUsername(), 
String.valueOf(credential.getPassword())));
+                    CredentialsStore credentialsProvider = new 
BasicCredentialsProvider();
+                    credentialsProvider.setCredentials(ANY,
+                        new 
UsernamePasswordCredentials(credential.getUsername(), 
credential.getPassword()));
                     builder.setDefaultCredentialsProvider(credentialsProvider);
                 });
         }
@@ -182,7 +198,7 @@ public class ClientProvider implements 
Provider<ReactorOpenSearchClient> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClientProvider.class);
 
     private final OpenSearchConfiguration configuration;
-    private final RestClient lowLevelRestClient;
+    private final OpenSearchTransport openSearchTransport;
     private final OpenSearchAsyncClient openSearchClient;
     private final HttpAsyncClientConfigurer httpAsyncClientConfigurer;
     private final ReactorOpenSearchClient client;
@@ -191,13 +207,13 @@ public class ClientProvider implements 
Provider<ReactorOpenSearchClient> {
     public ClientProvider(OpenSearchConfiguration configuration) {
         this.httpAsyncClientConfigurer = new 
HttpAsyncClientConfigurer(configuration);
         this.configuration = configuration;
-        this.lowLevelRestClient = buildRestClient();
+        this.openSearchTransport = buildTransport();
         this.openSearchClient = connect();
-        this.client = new ReactorOpenSearchClient(this.openSearchClient, 
lowLevelRestClient);
+        this.client = new ReactorOpenSearchClient(this.openSearchClient);
     }
 
-    private RestClient buildRestClient() {
-        return RestClient.builder(hostsToHttpHosts())
+    private OpenSearchTransport buildTransport() {
+        return ApacheHttpClient5TransportBuilder.builder(hostsToHttpHosts())
             .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure)
             .build();
     }
@@ -217,14 +233,12 @@ public class ClientProvider implements 
Provider<ReactorOpenSearchClient> {
     private OpenSearchAsyncClient connectToCluster() {
         LOGGER.info("Trying to connect to OpenSearch service at {}", 
LocalDateTime.now());
 
-        RestClientTransport transport = new 
RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper());
-
-        return new OpenSearchAsyncClient(transport);
+        return new OpenSearchAsyncClient(openSearchTransport);
     }
 
     private HttpHost[] hostsToHttpHosts() {
         return configuration.getHosts().stream()
-            .map(host -> new HttpHost(host.getHostName(), host.getPort(), 
configuration.getHostScheme().name()))
+            .map(host -> new HttpHost(configuration.getHostScheme().name(), 
host.getHostName(), host.getPort()))
             .toArray(HttpHost[]::new);
     }
 
@@ -235,6 +249,6 @@ public class ClientProvider implements 
Provider<ReactorOpenSearchClient> {
 
     @PreDestroy
     public void close() throws IOException {
-        lowLevelRestClient.close();
+        openSearchTransport.close();
     }
 }
diff --git 
a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
 
b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
index 05e41ecebe..4b722d3af2 100644
--- 
a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
+++ 
b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
@@ -22,7 +22,6 @@ package org.apache.james.backends.opensearch;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
-import org.opensearch.client.RestClient;
 import org.opensearch.client.opensearch.OpenSearchAsyncClient;
 import org.opensearch.client.opensearch.cluster.HealthRequest;
 import org.opensearch.client.opensearch.cluster.HealthResponse;
@@ -58,11 +57,9 @@ import reactor.core.scheduler.Schedulers;
 
 public class ReactorOpenSearchClient implements AutoCloseable {
     private final OpenSearchAsyncClient client;
-    private final RestClient lowLevelRestClient;
 
-    public ReactorOpenSearchClient(OpenSearchAsyncClient client, RestClient 
lowLevelRestClient) {
+    public ReactorOpenSearchClient(OpenSearchAsyncClient client) {
         this.client = client;
-        this.lowLevelRestClient = lowLevelRestClient;
     }
 
     public Mono<BulkResponse> bulk(BulkRequest bulkRequest) throws IOException 
{
@@ -81,10 +78,6 @@ public class ReactorOpenSearchClient implements 
AutoCloseable {
         return toReactor(client.deleteByQuery(deleteRequest));
     }
 
-    public RestClient getLowLevelClient() {
-        return lowLevelRestClient;
-    }
-
     public <T> Mono<IndexResponse> index(IndexRequest<T> indexRequest) throws 
IOException {
         return toReactor(client.index(indexRequest));
     }
@@ -127,7 +120,7 @@ public class ReactorOpenSearchClient implements 
AutoCloseable {
 
     @Override
     public void close() throws IOException {
-        lowLevelRestClient.close();
+        client._transport().close();
     }
 
     private static <T> Mono<T> toReactor(CompletableFuture<T> async) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to