This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e3298980db6c4fa3eadc64cdce958c325e4a71ae Author: Matthieu Baechler <[email protected]> AuthorDate: Wed Jun 19 15:17:25 2019 +0200 JAMES-2803 allow configuration of ES request timeout --- .../james/backends/es/ClientProviderImpl.java | 47 +++------ .../backends/es/ElasticSearchConfiguration.java | 25 ++++- .../es/ClientProviderImplConnectionTest.java | 30 +++--- .../james/backends/es/ClientProviderImplTest.java | 109 +-------------------- .../james/backends/es/DockerElasticSearch.java | 8 +- .../modules/mailbox/ElasticSearchClientModule.java | 2 +- .../apache/james/DockerElasticSearchExtension.java | 11 +++ 7 files changed, 71 insertions(+), 161 deletions(-) diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java index 3ba32f0..7816e8d 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java @@ -18,63 +18,48 @@ ****************************************************************/ package org.apache.james.backends.es; -import java.util.Optional; - import org.apache.http.HttpHost; -import org.apache.james.util.Host; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.settings.Settings; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; public class ClientProviderImpl implements ClientProvider { - public static ClientProviderImpl forHost(String address, Integer port, Optional<String> clusterName) { - return new ClientProviderImpl(ImmutableList.of(Host.from(address, port)), clusterName); - } - - public static ClientProviderImpl fromHostsString(String hostsString, Optional<String> clusterName) { - Preconditions.checkNotNull(hostsString, "HostString should not be null"); - return new ClientProviderImpl(Host.parseHosts(hostsString), clusterName); - } - - public static ClientProviderImpl fromHosts(ImmutableList<Host> hosts, Optional<String> clusterName) { - Preconditions.checkNotNull(hosts, "Hosts should not be null"); - return new ClientProviderImpl(hosts, clusterName); + public static ClientProviderImpl fromConfiguration(ElasticSearchConfiguration configuration) { + Preconditions.checkNotNull(configuration); + return new ClientProviderImpl(configuration); } private static final String CLUSTER_NAME_SETTING = "cluster.name"; private static final String HTTP_HOST_SCHEME = "http"; + private final ElasticSearchConfiguration configuration; - private final ImmutableList<Host> hosts; - private final Optional<String> clusterName; - - private ClientProviderImpl(ImmutableList<Host> hosts, Optional<String> clusterName) { - Preconditions.checkArgument(!hosts.isEmpty(), "You should provide at least one host"); - this.hosts = hosts; - this.clusterName = clusterName; + private ClientProviderImpl(ElasticSearchConfiguration configuration) { + this.configuration = configuration; } private HttpHost[] hostsToHttpHosts() { - return hosts.stream() + return configuration.getHosts().stream() .map(host -> new HttpHost(host.getHostName(), host.getPort(), HTTP_HOST_SCHEME)) .toArray(HttpHost[]::new); } @Override public RestHighLevelClient get() { - return new RestHighLevelClient(RestClient.builder(hostsToHttpHosts())); + RestClientBuilder restClient = RestClient.builder(hostsToHttpHosts()) + .setMaxRetryTimeoutMillis(Math.toIntExact(configuration.getRequestTimeout().toMillis())); + return new RestHighLevelClient(restClient); } @VisibleForTesting Settings settings() { - if (clusterName.isPresent()) { - return Settings.builder() - .put(CLUSTER_NAME_SETTING, clusterName.get()) - .build(); - } - return Settings.EMPTY; + return configuration.getClusterName() + .map(clusterName -> Settings.builder() + .put(CLUSTER_NAME_SETTING, clusterName) + .build()) + .orElse(Settings.EMPTY); } } diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java index a1dec4d..ed3199d 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java @@ -19,6 +19,7 @@ package org.apache.james.backends.es; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -46,6 +47,7 @@ public class ElasticSearchConfiguration { private Optional<Integer> nbReplica; private Optional<Integer> minDelay; private Optional<Integer> maxRetries; + private Optional<Duration> requestTimeout; public Builder() { hosts = ImmutableList.builder(); @@ -54,6 +56,7 @@ public class ElasticSearchConfiguration { nbReplica = Optional.empty(); minDelay = Optional.empty(); maxRetries = Optional.empty(); + requestTimeout = Optional.empty(); } public Builder addHost(Host host) { @@ -93,6 +96,11 @@ public class ElasticSearchConfiguration { return this; } + public Builder requestTimeout(Optional<Duration> requestTimeout) { + this.requestTimeout = requestTimeout; + return this; + } + public ElasticSearchConfiguration build() { ImmutableList<Host> hosts = this.hosts.build(); Preconditions.checkState(!hosts.isEmpty(), "You need to specify ElasticSearch host"); @@ -102,7 +110,8 @@ public class ElasticSearchConfiguration { nbShards.orElse(DEFAULT_NB_SHARDS), nbReplica.orElse(DEFAULT_NB_REPLICA), minDelay.orElse(DEFAULT_CONNECTION_MIN_DELAY), - maxRetries.orElse(DEFAULT_CONNECTION_MAX_RETRIES)); + maxRetries.orElse(DEFAULT_CONNECTION_MAX_RETRIES), + requestTimeout.orElse(DEFAULT_REQUEST_TIMEOUT)); } } @@ -121,6 +130,7 @@ public class ElasticSearchConfiguration { public static final int DEFAULT_CONNECTION_MAX_RETRIES = 7; public static final int DEFAULT_CONNECTION_MIN_DELAY = 3000; + public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30); public static final int DEFAULT_NB_SHARDS = 5; public static final int DEFAULT_NB_REPLICA = 1; public static final int DEFAULT_PORT = 9200; @@ -184,14 +194,16 @@ public class ElasticSearchConfiguration { private final int nbReplica; private final int minDelay; private final int maxRetries; + private final Duration requestTimeout; - private ElasticSearchConfiguration(ImmutableList<Host> hosts, Optional<String> clusterName, int nbShards, int nbReplica, int minDelay, int maxRetries) { + private ElasticSearchConfiguration(ImmutableList<Host> hosts, Optional<String> clusterName, int nbShards, int nbReplica, int minDelay, int maxRetries, Duration requestTimeout) { this.hosts = hosts; this.clusterName = clusterName; this.nbShards = nbShards; this.nbReplica = nbReplica; this.minDelay = minDelay; this.maxRetries = maxRetries; + this.requestTimeout = requestTimeout; } public ImmutableList<Host> getHosts() { @@ -218,6 +230,10 @@ public class ElasticSearchConfiguration { return maxRetries; } + public Duration getRequestTimeout() { + return requestTimeout; + } + @Override public final boolean equals(Object o) { if (o instanceof ElasticSearchConfiguration) { @@ -228,13 +244,14 @@ public class ElasticSearchConfiguration { && Objects.equals(this.nbReplica, that.nbReplica) && Objects.equals(this.minDelay, that.minDelay) && Objects.equals(this.maxRetries, that.maxRetries) - && Objects.equals(this.hosts, that.hosts); + && Objects.equals(this.hosts, that.hosts) + && Objects.equals(this.requestTimeout, that.requestTimeout); } return false; } @Override public final int hashCode() { - return Objects.hash(hosts, clusterName, nbShards, nbReplica, minDelay, maxRetries); + return Objects.hash(hosts, clusterName, nbShards, nbReplica, minDelay, maxRetries, requestTimeout); } } diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java index 149aa07..72ef84a 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java @@ -19,9 +19,9 @@ package org.apache.james.backends.es; -import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.apache.james.util.Host; import org.apache.james.util.docker.DockerGenericContainer; import org.apache.james.util.docker.Images; import org.awaitility.Awaitility; @@ -53,22 +53,27 @@ public class ClientProviderImplConnectionTest { @Test public void connectingASingleServerShouldWork() { + ElasticSearchConfiguration configuration = ElasticSearchConfiguration.builder() + .addHost(Host.from(es1.getContainerIp(), ES_APPLICATIVE_PORT)) + .build(); + Awaitility.await() .atMost(1, TimeUnit.MINUTES) .pollInterval(5, TimeUnit.SECONDS) - .until(() -> isConnected(ClientProviderImpl.forHost(es1.getContainerIp(), ES_APPLICATIVE_PORT, Optional.empty()))); + .until(() -> isConnected(ClientProviderImpl.fromConfiguration(configuration))); } @Test public void connectingAClusterShouldWork() { + ElasticSearchConfiguration configuration = ElasticSearchConfiguration.builder() + .addHost(Host.from(es1.getContainerIp(), ES_APPLICATIVE_PORT)) + .addHost(Host.from(es2.getContainerIp(), ES_APPLICATIVE_PORT)) + .build(); + Awaitility.await() .atMost(1, TimeUnit.MINUTES) .pollInterval(5, TimeUnit.SECONDS) - .until(() -> isConnected( - ClientProviderImpl.fromHostsString( - es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + "," - + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT, - Optional.empty()))); + .until(() -> isConnected(ClientProviderImpl.fromConfiguration(configuration))); } @Test @@ -77,14 +82,15 @@ public class ClientProviderImplConnectionTest { String es2Ip = es2.getContainerIp(); es2.stop(); + ElasticSearchConfiguration configuration = ElasticSearchConfiguration.builder() + .addHost(Host.from(es1Ip, ES_APPLICATIVE_PORT)) + .addHost(Host.from(es2Ip, ES_APPLICATIVE_PORT)) + .build(); + Awaitility.await() .atMost(1, TimeUnit.MINUTES) .pollInterval(5, TimeUnit.SECONDS) - .until(() -> isConnected( - ClientProviderImpl.fromHostsString( - es1Ip + ":" + ES_APPLICATIVE_PORT + "," - + es2Ip + ":" + ES_APPLICATIVE_PORT, - Optional.empty()))); + .until(() -> isConnected(ClientProviderImpl.fromConfiguration(configuration))); } private boolean isConnected(ClientProvider clientProvider) { diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java index dac6ce3..949ad74 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java @@ -30,113 +30,8 @@ import org.junit.Test; public class ClientProviderImplTest { @Test - public void fromHostsStringShouldThrowOnNullString() { - assertThatThrownBy(() -> ClientProviderImpl.fromHostsString(null, Optional.empty())) + public void fromConfigurationShouldThrowOnNull() { + assertThatThrownBy(() -> ClientProviderImpl.fromConfiguration(null)) .isInstanceOf(NullPointerException.class); } - - @Test - public void fromHostsStringShouldThrowOnEmptyString() { - assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("", Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void forHostShouldThrowOnNullHost() { - assertThatThrownBy(() -> ClientProviderImpl.forHost(null, 9200, Optional.empty())) - .isInstanceOf(NullPointerException.class); - } - - @Test - public void forHostShouldThrowOnEmptyHost() { - assertThatThrownBy(() -> ClientProviderImpl.forHost("", 9200, Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void forHostShouldThrowOnNegativePort() { - assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", -1, Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void forHostShouldThrowOnZeroPort() { - assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", 0, Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void forHostShouldThrowOnTooBigPort() { - assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", 65536, Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void fromHostsStringShouldEmptyAddress() { - assertThatThrownBy(() -> ClientProviderImpl.fromHostsString(":9200", Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void fromHostsStringShouldThrowOnAbsentPort() { - assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost", Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void fromHostsStringShouldThrowWhenTooMuchParts() { - assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:9200:9200", Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void fromHostsStringShouldThrowOnEmptyPort() { - assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:", Optional.empty())) - .isInstanceOf(NumberFormatException.class); - } - - @Test - public void fromHostsStringShouldThrowOnInvalidPort() { - assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:invalid", Optional.empty())) - .isInstanceOf(NumberFormatException.class); - } - - @Test - public void fromHostsStringShouldThrowOnNegativePort() { - assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:-1", Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void fromHostsStringShouldThrowOnZeroPort() { - assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:0", Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void fromHostsStringShouldThrowOnTooBigPort() { - assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:65536", Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void fromHostsStringShouldThrowIfOneHostIsInvalid() { - assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:9200,localhost", Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void settingsShouldBeEmptyWhenClusterNameIsEmpty() { - ClientProviderImpl clientProvider = ClientProviderImpl.fromHostsString("localhost:9200", Optional.empty()); - - assertThat(clientProvider.settings()).isEqualTo(Settings.EMPTY); - } - - @Test - public void settingsShouldContainClusterNameSettingWhenClusterNameIsGiven() { - String clusterName = "myClusterName"; - ClientProviderImpl clientProvider = ClientProviderImpl.fromHostsString("localhost:9200", Optional.of(clusterName)); - - assertThat(clientProvider.settings().get("cluster.name")).isEqualTo(clusterName); - } } diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java index e2d5f78..82eb83b 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java @@ -21,8 +21,6 @@ package org.apache.james.backends.es; import static org.assertj.core.api.Assertions.assertThat; -import java.util.Optional; - import org.apache.http.HttpStatus; import org.apache.james.util.Host; import org.apache.james.util.docker.DockerGenericContainer; @@ -30,9 +28,7 @@ import org.apache.james.util.docker.Images; import org.apache.james.util.docker.RateLimiters; import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; - import feign.Feign; import feign.Logger; import feign.RequestLine; @@ -117,8 +113,8 @@ public class DockerElasticSearch { } public ClientProvider clientProvider() { - Optional<String> noClusterName = Optional.empty(); - return ClientProviderImpl.fromHosts(ImmutableList.of(getHttpHost()), noClusterName); + ElasticSearchConfiguration configuration = ElasticSearchConfiguration.builder().addHost(getHttpHost()).build(); + return ClientProviderImpl.fromConfiguration(configuration); } private ElasticSearchAPI esAPI() { diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java index e485fee..020a365 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java @@ -62,7 +62,7 @@ public class ElasticSearchClientModule extends AbstractModule { private RestHighLevelClient connectToCluster(ElasticSearchConfiguration configuration) throws IOException { LOGGER.info("Trying to connect to ElasticSearch service at {}", LocalDateTime.now()); - return ClientProviderImpl.fromHosts(configuration.getHosts(), configuration.getClusterName()) + return ClientProviderImpl.fromConfiguration(configuration) .get(); } } diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DockerElasticSearchExtension.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DockerElasticSearchExtension.java index 531bf13..7ff07f4 100644 --- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DockerElasticSearchExtension.java +++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DockerElasticSearchExtension.java @@ -19,6 +19,9 @@ package org.apache.james; +import java.time.Duration; +import java.util.Optional; + import org.apache.james.backends.es.DockerElasticSearch; import org.apache.james.backends.es.DockerElasticSearchSingleton; import org.apache.james.backends.es.ElasticSearchConfiguration; @@ -29,13 +32,20 @@ import com.google.inject.Module; public class DockerElasticSearchExtension implements GuiceModuleTestExtension { private final DockerElasticSearch dockerElasticSearch; + private Optional<Duration> requestTimeout; public DockerElasticSearchExtension() { this(DockerElasticSearchSingleton.INSTANCE); } + public DockerElasticSearchExtension withRequestTimeout(Duration requestTimeout) { + this.requestTimeout = Optional.of(requestTimeout); + return this; + } + public DockerElasticSearchExtension(DockerElasticSearch dockerElasticSearch) { this.dockerElasticSearch = dockerElasticSearch; + requestTimeout = Optional.empty(); } @Override @@ -61,6 +71,7 @@ public class DockerElasticSearchExtension implements GuiceModuleTestExtension { private ElasticSearchConfiguration getElasticSearchConfigurationForDocker() { return ElasticSearchConfiguration.builder() .addHost(getDockerES().getHttpHost()) + .requestTimeout(requestTimeout) .build(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
