JAMES-2592 Add cluster.name in ElasticSearch client settings

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/987be509
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/987be509
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/987be509

Branch: refs/heads/master
Commit: 987be509263c6f32188860f3fdef077afcfa0b1d
Parents: 644266c
Author: Antoine Duprat <adup...@linagora.com>
Authored: Tue Nov 13 16:24:53 2018 +0100
Committer: Benoit Tellier <btell...@linagora.com>
Committed: Thu Nov 15 09:05:42 2018 +0700

----------------------------------------------------------------------
 .../james/backends/es/ClientProviderImpl.java   | 34 ++++++++++---
 .../es/ClientProviderImplConnectionTest.java    | 13 +++--
 .../backends/es/ClientProviderImplTest.java     | 52 ++++++++++++++------
 .../mailbox/ElasticSearchMailboxModule.java     |  2 +-
 .../apache/james/metric/es/ESReporterTest.java  |  3 +-
 5 files changed, 73 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/987be509/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
index 317c0b7..8d92ae9 100644
--- 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
+++ 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
@@ -19,44 +19,53 @@
 package org.apache.james.backends.es;
 
 import java.net.InetAddress;
+import java.util.Optional;
 
 import org.apache.james.util.Host;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.consumers.ConsumerChainer;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 public class ClientProviderImpl implements ClientProvider {
 
-    public static ClientProviderImpl forHost(String address, Integer port) {
-        return new ClientProviderImpl(ImmutableList.of(Host.from(address, 
port)));
+    public static ClientProviderImpl forHost(String address, Integer port, 
Optional<String> clusterName) {
+        return new ClientProviderImpl(ImmutableList.of(Host.from(address, 
port)), clusterName);
     }
 
-    public static ClientProviderImpl fromHostsString(String hostsString) {
+    public static ClientProviderImpl fromHostsString(String hostsString, 
Optional<String> clusterName) {
         Preconditions.checkNotNull(hostsString, "HostString should not be 
null");
-        return new ClientProviderImpl(Host.parseHosts(hostsString));
+        return new ClientProviderImpl(Host.parseHosts(hostsString), 
clusterName);
     }
 
-    public static ClientProviderImpl fromHosts(ImmutableList<Host> hosts) {
+    public static ClientProviderImpl fromHosts(ImmutableList<Host> hosts, 
Optional<String> clusterName) {
         Preconditions.checkNotNull(hosts, "Hosts should not be null");
-        return new ClientProviderImpl(hosts);
+        return new ClientProviderImpl(hosts, clusterName);
     }
 
+    private static final String CLUSTER_NAME_SETTING = "cluster.name";
+
     private final ImmutableList<Host> hosts;
+    private final Optional<String> clusterName;
 
-    private ClientProviderImpl(ImmutableList<Host> hosts) {
+    private ClientProviderImpl(ImmutableList<Host> hosts, Optional<String> 
clusterName) {
         Preconditions.checkArgument(!hosts.isEmpty(), "You should provide at 
least one host");
         this.hosts = hosts;
+        this.clusterName = clusterName;
     }
 
 
     @Override
     public Client get() {
-        TransportClient transportClient = TransportClient.builder().build();
+        TransportClient transportClient = TransportClient.builder()
+                .settings(settings())
+                .build();
         ConsumerChainer<Host> consumer = Throwing.consumer(host -> 
transportClient
             .addTransportAddress(
                 new InetSocketTransportAddress(
@@ -65,4 +74,13 @@ public class ClientProviderImpl implements ClientProvider {
         hosts.forEach(consumer.sneakyThrow());
         return transportClient;
     }
+
+    @VisibleForTesting Settings settings() {
+        if (clusterName.isPresent()) {
+            return Settings.builder()
+                    .put(CLUSTER_NAME_SETTING, clusterName.get())
+                    .build();
+        }
+        return Settings.EMPTY;
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/987be509/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
index aa0a8e9..56f950d 100644
--- 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
+++ 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.backends.es;
 
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.util.docker.SwarmGenericContainer;
@@ -52,7 +53,7 @@ public class ClientProviderImplConnectionTest {
         Awaitility.await()
             .atMost(1, TimeUnit.MINUTES)
             .pollInterval(5, TimeUnit.SECONDS)
-            .until(() -> 
isConnected(ClientProviderImpl.forHost(es1.getContainerIp(), 9300)));
+            .until(() -> 
isConnected(ClientProviderImpl.forHost(es1.getContainerIp(), 9300, 
Optional.empty())));
     }
 
     @Test
@@ -62,8 +63,9 @@ public class ClientProviderImplConnectionTest {
             .pollInterval(5, TimeUnit.SECONDS)
             .until(() -> isConnected(
                 ClientProviderImpl.fromHostsString(
-                    es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + ","
-                    + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT)));
+                            es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + 
","
+                            + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT
+                        , Optional.empty())));
     }
 
     @Test
@@ -75,8 +77,9 @@ public class ClientProviderImplConnectionTest {
             .pollInterval(5, TimeUnit.SECONDS)
             .until(() -> isConnected(
                 ClientProviderImpl.fromHostsString(
-                    es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + ","
-                    + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT)));
+                            es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + 
","
+                            + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT
+                        , Optional.empty())));
     }
 
     private boolean isConnected(ClientProvider clientProvider) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/987be509/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java
 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java
index a97eb64..95f1014 100644
--- 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java
+++ 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplTest.java
@@ -19,10 +19,15 @@
 
 package org.apache.james.backends.es;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.elasticsearch.common.settings.Settings;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.util.Optional;
+
 public class ClientProviderImplTest {
 
     @Rule
@@ -32,111 +37,126 @@ public class ClientProviderImplTest {
     public void fromHostsStringShouldThrowOnNullString() {
         expectedException.expect(NullPointerException.class);
 
-        ClientProviderImpl.fromHostsString(null);
+        ClientProviderImpl.fromHostsString(null, Optional.empty());
     }
 
     @Test
     public void fromHostsStringShouldThrowOnEmptyString() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.fromHostsString("");
+        ClientProviderImpl.fromHostsString("", Optional.empty());
     }
 
     @Test
     public void forHostShouldThrowOnNullHost() {
         expectedException.expect(NullPointerException.class);
 
-        ClientProviderImpl.forHost(null, 9200);
+        ClientProviderImpl.forHost(null, 9200, Optional.empty());
     }
 
     @Test
     public void forHostShouldThrowOnEmptyHost() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.forHost("", 9200);
+        ClientProviderImpl.forHost("", 9200, Optional.empty());
     }
 
     @Test
     public void forHostShouldThrowOnNegativePort() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.forHost("localhost", -1);
+        ClientProviderImpl.forHost("localhost", -1, Optional.empty());
     }
 
     @Test
     public void forHostShouldThrowOnZeroPort() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.forHost("localhost", 0);
+        ClientProviderImpl.forHost("localhost", 0, Optional.empty());
     }
 
     @Test
     public void forHostShouldThrowOnTooBigPort() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.forHost("localhost", 65536);
+        ClientProviderImpl.forHost("localhost", 65536, Optional.empty());
     }
 
     @Test
     public void fromHostsStringShouldEmptyAddress() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.fromHostsString(":9200");
+        ClientProviderImpl.fromHostsString(":9200", Optional.empty());
     }
 
     @Test
     public void fromHostsStringShouldThrowOnAbsentPort() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.fromHostsString("localhost");
+        ClientProviderImpl.fromHostsString("localhost", Optional.empty());
     }
 
     @Test
     public void fromHostsStringShouldThrowWhenTooMuchParts() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.fromHostsString("localhost:9200:9200");
+        ClientProviderImpl.fromHostsString("localhost:9200:9200", 
Optional.empty());
     }
 
     @Test
     public void fromHostsStringShouldThrowOnEmptyPort() {
         expectedException.expect(NumberFormatException.class);
 
-        ClientProviderImpl.fromHostsString("localhost:");
+        ClientProviderImpl.fromHostsString("localhost:", Optional.empty());
     }
 
     @Test
     public void fromHostsStringShouldThrowOnInvalidPort() {
         expectedException.expect(NumberFormatException.class);
 
-        ClientProviderImpl.fromHostsString("localhost:invalid");
+        ClientProviderImpl.fromHostsString("localhost:invalid", 
Optional.empty());
     }
 
     @Test
     public void fromHostsStringShouldThrowOnNegativePort() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.fromHostsString("localhost:-1");
+        ClientProviderImpl.fromHostsString("localhost:-1", Optional.empty());
     }
 
     @Test
     public void fromHostsStringShouldThrowOnZeroPort() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.fromHostsString("localhost:0");
+        ClientProviderImpl.fromHostsString("localhost:0", Optional.empty());
     }
 
     @Test
     public void fromHostsStringShouldThrowOnTooBigPort() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.fromHostsString("localhost:65536");
+        ClientProviderImpl.fromHostsString("localhost:65536", 
Optional.empty());
     }
 
     @Test
     public void fromHostsStringShouldThrowIfOneHostIsInvalid() {
         expectedException.expect(IllegalArgumentException.class);
 
-        ClientProviderImpl.fromHostsString("localhost:9200,localhost");
+        ClientProviderImpl.fromHostsString("localhost:9200,localhost", 
Optional.empty());
+    }
+
+    @Test
+    public void settingsShouldBeEmptyWhenClusterNameIsEmpty() {
+        ClientProviderImpl clientProvider = 
ClientProviderImpl.fromHostsString("localhost:9200", Optional.empty());
+
+        assertThat(clientProvider.settings()).isEqualTo(Settings.EMPTY);
+    }
+
+    @Test
+    public void 
settingsShouldContainClusterNameSettingWhenClusterNameIsGiven() {
+        String clusterName = "myClusterName";
+        ClientProviderImpl clientProvider = 
ClientProviderImpl.fromHostsString("localhost:9200", Optional.of(clusterName));
+
+        
assertThat(clientProvider.settings().get("cluster.name")).isEqualTo(clusterName);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/987be509/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
index ad05503..56b7fbc 100644
--- 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
+++ 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
@@ -144,7 +144,7 @@ public class ElasticSearchMailboxModule extends 
AbstractModule {
                                     ElasticSearchQuotaConfiguration 
quotaConfiguration) {
         LOGGER.info("Trying to connect to ElasticSearch service at {}", 
LocalDateTime.now());
 
-        Client client = 
ClientProviderImpl.fromHosts(configuration.getHosts()).get();
+        Client client = ClientProviderImpl.fromHosts(configuration.getHosts(), 
configuration.getClusterName()).get();
 
         MailboxIndexCreationUtil.prepareClient(client,
             mailboxConfiguration.getReadAliasMailboxName(),

http://git-wip-us.apache.org/repos/asf/james-project/blob/987be509/server/container/metrics/metrics-es-reporter/src/test/java/org/apache/james/metric/es/ESReporterTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/metrics/metrics-es-reporter/src/test/java/org/apache/james/metric/es/ESReporterTest.java
 
b/server/container/metrics/metrics-es-reporter/src/test/java/org/apache/james/metric/es/ESReporterTest.java
index d602f8b..30820f9 100644
--- 
a/server/container/metrics/metrics-es-reporter/src/test/java/org/apache/james/metric/es/ESReporterTest.java
+++ 
b/server/container/metrics/metrics-es-reporter/src/test/java/org/apache/james/metric/es/ESReporterTest.java
@@ -21,6 +21,7 @@ package org.apache.james.metric.es;
 
 import static org.awaitility.Awaitility.await;
 
+import java.util.Optional;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -67,7 +68,7 @@ public class ESReporterTest {
 
     @Before
     public void setUp() {
-        clientProvider = ClientProviderImpl.forHost(esContainer.getHostIp(), 
esContainer.getMappedPort(ES_APPLICATIVE_PORT));
+        clientProvider = ClientProviderImpl.forHost(esContainer.getHostIp(), 
esContainer.getMappedPort(ES_APPLICATIVE_PORT), Optional.empty());
         await().atMost(Duration.ONE_MINUTE)
             .until(() -> elasticSearchStarted(clientProvider));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to