http://git-wip-us.apache.org/repos/asf/curator/blob/57dbf2e1/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index 44f9d00..297cf9b 100644
---
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.imps;
import org.apache.curator.ensemble.EnsembleListener;
@@ -27,6 +28,7 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingCluster;
+import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -34,7 +36,6 @@ import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
@@ -45,32 +46,33 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-public class TestReconfiguration {
-
- TestingCluster cluster;
- DynamicEnsembleProvider dynamicEnsembleProvider;
- WaitOnDelegateListener waitOnDelegateListener;
- EnsembleTracker ensembleTracker;
- CuratorFramework client;
+public class TestReconfiguration
+{
+ private TestingCluster cluster;
+ private DynamicEnsembleProvider dynamicEnsembleProvider;
+ private WaitOnDelegateListener waitOnDelegateListener;
+ private EnsembleTracker ensembleTracker;
+ private CuratorFramework client;
- String connectionString1to5;
- String connectionString2to5;
- String connectionString3to5;
+ private String connectionString1to5;
+ private String connectionString2to5;
+ private String connectionString3to5;
@BeforeMethod
- public void setup() throws Exception {
+ public void setup() throws Exception
+ {
cluster = new TestingCluster(5);
cluster.start();
connectionString1to5 = cluster.getConnectString();
- connectionString2to5 = getConnectionString(cluster, 2,3,4,5);
- connectionString3to5 = getConnectionString(cluster, 3,4,5);
+ connectionString2to5 = getConnectionString(cluster, 2, 3, 4, 5);
+ connectionString3to5 = getConnectionString(cluster, 3, 4, 5);
dynamicEnsembleProvider = new
DynamicEnsembleProvider(connectionString1to5);
client = CuratorFrameworkFactory.builder()
- .ensembleProvider(dynamicEnsembleProvider)
- .retryPolicy(new RetryOneTime(1))
- .build();
+ .ensembleProvider(dynamicEnsembleProvider)
+ .retryPolicy(new RetryOneTime(1))
+ .build();
client.start();
client.blockUntilConnected();
@@ -84,14 +86,16 @@ public class TestReconfiguration {
}
@AfterMethod
- public void tearDown() throws IOException {
- ensembleTracker.close();
- client.close();
- cluster.close();
+ public void tearDown() throws IOException
+ {
+ CloseableUtils.closeQuietly(ensembleTracker);
+ CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(cluster);
}
@Test
- public void testSyncIncremental() throws Exception {
+ public void testSyncIncremental() throws Exception
+ {
Stat stat = new Stat();
byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
Assert.assertNotNull(bytes);
@@ -132,15 +136,19 @@ public class TestReconfiguration {
}
@Test
- public void testAsyncIncremental() throws Exception {
- final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
- final BackgroundCallback callback = new BackgroundCallback() {
+ public void testAsyncIncremental() throws Exception
+ {
+ final AtomicReference<byte[]> bytes = new AtomicReference<>();
+ final BackgroundCallback callback = new BackgroundCallback()
+ {
@Override
- public void processResult(CuratorFramework client, CuratorEvent
event) throws Exception {
+ public void processResult(CuratorFramework client, CuratorEvent
event) throws Exception
+ {
bytes.set(event.getData());
//We only need the latch on getConfig.
- if (event.getContext() != null) {
- ((CountDownLatch) event.getContext()).countDown();
+ if ( event.getContext() != null )
+ {
+ ((CountDownLatch)event.getContext()).countDown();
}
}
@@ -155,29 +163,27 @@ public class TestReconfiguration {
String server1 = getServerString(qv, cluster, 1L);
String server2 = getServerString(qv, cluster, 2L);
-
//Remove Servers
-
client.reconfig().leaving("1").fromConfig(qv.getVersion()).inBackground(callback).forEnsemble();
+
client.reconfig().inBackground(callback).leaving("1").fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(),
connectionString2to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 4);
-
client.reconfig().leaving("2").fromConfig(qv.getVersion()).inBackground(callback,
latch).forEnsemble();
+ client.reconfig().inBackground(callback,
latch).leaving("2").fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(),
connectionString3to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 3);
//Add Servers
- client.reconfig().joining("server.2=" +
server2).fromConfig(qv.getVersion()).inBackground(callback,
latch).forEnsemble();
+ client.reconfig().inBackground(callback, latch).joining("server.2=" +
server2).fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(),
connectionString2to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 4);
-
- client.reconfig().joining("server.1=" +
server1).fromConfig(qv.getVersion()).inBackground(callback,
latch).forEnsemble();
+ client.reconfig().inBackground(callback, latch).joining("server.1=" +
server1).fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(),
connectionString1to5);
qv = getQuorumVerifier(bytes.get());
@@ -185,7 +191,8 @@ public class TestReconfiguration {
}
@Test
- public void testSyncNonIncremental() throws Exception {
+ public void testSyncNonIncremental() throws Exception
+ {
Stat stat = new Stat();
byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
Assert.assertNotNull(bytes);
@@ -199,11 +206,11 @@ public class TestReconfiguration {
//Remove Servers
bytes = client.reconfig()
- .withMembers("server.2=" + server2,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+ .adding("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 4);
@@ -211,10 +218,10 @@ public class TestReconfiguration {
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(),
connectionString2to5);
bytes = client.reconfig()
- .withMembers("server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+ .adding("server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 3);
@@ -224,11 +231,11 @@ public class TestReconfiguration {
//Add Servers
bytes = client.reconfig()
- .withMembers("server.2=" + server2,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+ .adding("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 4);
@@ -236,12 +243,12 @@ public class TestReconfiguration {
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(),
connectionString2to5);
bytes = client.reconfig()
- .withMembers("server.1=" + server1,
- "server.2=" + server2,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
+ .adding("server.1=" + server1,
+ "server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 5);
@@ -250,13 +257,16 @@ public class TestReconfiguration {
}
@Test
- public void testAsyncNonIncremental() throws Exception {
- final AtomicReference<byte[]> bytes = new AtomicReference<byte[]>();
- final BackgroundCallback callback = new BackgroundCallback() {
+ public void testAsyncNonIncremental() throws Exception
+ {
+ final AtomicReference<byte[]> bytes = new AtomicReference<>();
+ final BackgroundCallback callback = new BackgroundCallback()
+ {
@Override
- public void processResult(CuratorFramework client, CuratorEvent
event) throws Exception {
+ public void processResult(CuratorFramework client, CuratorEvent
event) throws Exception
+ {
bytes.set(event.getData());
- ((CountDownLatch) event.getContext()).countDown();
+ ((CountDownLatch)event.getContext()).countDown();
}
};
@@ -274,86 +284,97 @@ public class TestReconfiguration {
String server5 = getServerString(qv, cluster, 5L);
//Remove Servers
- client.reconfig()
- .withMembers("server.2=" + server2,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .fromConfig(qv.getVersion()).inBackground(callback,
latch).forEnsemble();
+ client.reconfig().inBackground(callback, latch)
+ .adding("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(),
connectionString2to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 4);
- client.reconfig()
- .withMembers("server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .fromConfig(qv.getVersion()).inBackground(callback,
latch).forEnsemble();
+ client.reconfig().inBackground(callback, latch)
+ .adding("server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(),
connectionString3to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 3);
//Add Servers
- client.reconfig()
- .withMembers("server.2=" + server2,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .fromConfig(qv.getVersion()).inBackground(callback,
latch).forEnsemble();
+ client.reconfig().inBackground(callback, latch)
+ .adding("server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(),
connectionString2to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 4);
- client.reconfig()
- .withMembers("server.1=" + server1,
- "server.2=" + server2,
- "server.3=" + server3,
- "server.4=" + server4,
- "server.5=" + server5)
- .fromConfig(qv.getVersion()).inBackground(callback,
latch).forEnsemble();
+ client.reconfig().inBackground(callback, latch)
+ .adding("server.1=" + server1,
+ "server.2=" + server2,
+ "server.3=" + server3,
+ "server.4=" + server4,
+ "server.5=" + server5)
+ .fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(),
connectionString1to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 5);
}
-
- static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception {
+ static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception
+ {
Properties properties = new Properties();
properties.load(new StringReader(new String(bytes)));
return new QuorumMaj(properties);
}
- static InstanceSpec getInstance(TestingCluster cluster, int id) {
- for (InstanceSpec spec : cluster.getInstances()) {
- if (spec.getServerId() == id) {
+ static InstanceSpec getInstance(TestingCluster cluster, int id)
+ {
+ for ( InstanceSpec spec : cluster.getInstances() )
+ {
+ if ( spec.getServerId() == id )
+ {
return spec;
}
}
throw new IllegalStateException("InstanceSpec with id:" + id + " not
found");
}
- static String getServerString(QuorumVerifier qv, TestingCluster cluster,
long id) throws Exception {
+ static String getServerString(QuorumVerifier qv, TestingCluster cluster,
long id) throws Exception
+ {
String str = qv.getAllMembers().get(id).toString();
//check if connection string is already there.
- if (str.contains(";")) {
+ if ( str.contains(";") )
+ {
return str;
- } else {
- return str + ";" + getInstance(cluster, (int)
id).getConnectString();
+ }
+ else
+ {
+ return str + ";" + getInstance(cluster,
(int)id).getConnectString();
}
}
- static String getConnectionString(TestingCluster cluster, long... ids)
throws Exception {
+ static String getConnectionString(TestingCluster cluster, long... ids)
throws Exception
+ {
StringBuilder sb = new StringBuilder();
- Map<Long, InstanceSpec> specs = new HashMap<Long, InstanceSpec>();
- for (InstanceSpec spec : cluster.getInstances()) {
- specs.put(new Long(spec.getServerId()), spec);
+ Map<Long, InstanceSpec> specs = new HashMap<>();
+ for ( InstanceSpec spec : cluster.getInstances() )
+ {
+ specs.put((long)spec.getServerId(), spec);
}
- for (long id : ids) {
- if (sb.length() != 0) {
+ for ( long id : ids )
+ {
+ if ( sb.length() != 0 )
+ {
sb.append(",");
}
sb.append(specs.get(id).getConnectString());
@@ -362,27 +383,34 @@ public class TestReconfiguration {
}
//Simple EnsembleListener that can wait until the delegate handles the
event.
- private static class WaitOnDelegateListener implements EnsembleListener {
+ private static class WaitOnDelegateListener implements EnsembleListener
+ {
private CountDownLatch latch = new CountDownLatch(1);
private final EnsembleListener delegate;
- private WaitOnDelegateListener(EnsembleListener delegate) {
+ private WaitOnDelegateListener(EnsembleListener delegate)
+ {
this.delegate = delegate;
}
@Override
- public void connectionStringUpdated(String connectionString) {
+ public void connectionStringUpdated(String connectionString)
+ {
delegate.connectionStringUpdated(connectionString);
latch.countDown();
}
- public void waitForEvent() throws InterruptedException,
TimeoutException {
- if (latch.await(5, TimeUnit.SECONDS)) {
+ public void waitForEvent() throws InterruptedException,
TimeoutException
+ {
+ if ( latch.await(5, TimeUnit.SECONDS) )
+ {
latch = new CountDownLatch(1);
- } else {
+ }
+ else
+ {
throw new TimeoutException("Failed to receive event in time.");
}
}
- };
+ }
}
\ No newline at end of file