http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java index eb81900..721bfbf 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java @@ -18,6 +18,7 @@ package org.apache.distributedlog.impl; import com.google.common.collect.Lists; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DLMTestUtil; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.LogSegmentMetadata; @@ -30,15 +31,13 @@ import org.apache.distributedlog.exceptions.ZKException; import org.apache.distributedlog.metadata.LogMetadata; import org.apache.distributedlog.metadata.LogMetadataForWriter; import org.apache.distributedlog.util.DLUtils; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Transaction; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.Promise; import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; +import org.apache.distributedlog.util.Utils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -137,7 +136,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(1L); Transaction<Object> createTxn = lsmStore.transaction(); lsmStore.createLogSegment(createTxn, segment, null); - FutureUtils.result(createTxn.execute()); + Utils.ioResult(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment + " should be created", zkc.get().exists(segment.getZkPath(), false)); @@ -145,7 +144,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> createTxn2 = lsmStore.transaction(); lsmStore.createLogSegment(createTxn2, segment2, null); try { - FutureUtils.result(createTxn2.execute()); + Utils.ioResult(createTxn2.execute()); fail("Should fail if log segment exists"); } catch (Throwable t) { // expected @@ -162,13 +161,13 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(1L); Transaction<Object> createTxn = lsmStore.transaction(); lsmStore.createLogSegment(createTxn, segment, null); - FutureUtils.result(createTxn.execute()); + Utils.ioResult(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment + " should be created", zkc.get().exists(segment.getZkPath(), false)); Transaction<Object> deleteTxn = lsmStore.transaction(); lsmStore.deleteLogSegment(deleteTxn, segment, null); - FutureUtils.result(deleteTxn.execute()); + Utils.ioResult(deleteTxn.execute()); assertNull("LogSegment " + segment + " should be deleted", zkc.get().exists(segment.getZkPath(), false)); } @@ -179,7 +178,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> deleteTxn = lsmStore.transaction(); lsmStore.deleteLogSegment(deleteTxn, segment, null); try { - FutureUtils.result(deleteTxn.execute()); + Utils.ioResult(deleteTxn.execute()); fail("Should fail deletion if log segment doesn't exist"); } catch (Throwable t) { assertTrue("Should throw NoNodeException if log segment doesn't exist", @@ -196,7 +195,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> updateTxn = lsmStore.transaction(); lsmStore.updateLogSegment(updateTxn, segment); try { - FutureUtils.result(updateTxn.execute()); + Utils.ioResult(updateTxn.execute()); fail("Should fail update if log segment doesn't exist"); } catch (Throwable t) { assertTrue("Should throw NoNodeException if log segment doesn't exist", @@ -212,17 +211,17 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(1L, 99L); Transaction<Object> createTxn = lsmStore.transaction(); lsmStore.createLogSegment(createTxn, segment, null); - FutureUtils.result(createTxn.execute()); + Utils.ioResult(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment + " should be created", zkc.get().exists(segment.getZkPath(), false)); LogSegmentMetadata modifiedSegment = createLogSegment(1L, 999L); Transaction<Object> updateTxn = lsmStore.transaction(); lsmStore.updateLogSegment(updateTxn, modifiedSegment); - FutureUtils.result(updateTxn.execute()); + Utils.ioResult(updateTxn.execute()); // the log segment should be updated LogSegmentMetadata readSegment = - FutureUtils.result(LogSegmentMetadata.read(zkc, segment.getZkPath(), true)); + Utils.ioResult(LogSegmentMetadata.read(zkc, segment.getZkPath(), true)); assertEquals("Last entry id should be changed from 99L to 999L", 999L, readSegment.getLastEntryId()); } @@ -234,7 +233,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { // create log segment 1 Transaction<Object> createTxn = lsmStore.transaction(); lsmStore.createLogSegment(createTxn, segment1, null); - FutureUtils.result(createTxn.execute()); + Utils.ioResult(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment1 + " should be created", zkc.get().exists(segment1.getZkPath(), false)); @@ -242,7 +241,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> createDeleteTxn = lsmStore.transaction(); lsmStore.createLogSegment(createDeleteTxn, segment2, null); lsmStore.deleteLogSegment(createDeleteTxn, segment1, null); - FutureUtils.result(createDeleteTxn.execute()); + Utils.ioResult(createDeleteTxn.execute()); // segment 1 should be deleted, segment 2 should be created assertNull("LogSegment " + segment1 + " should be deleted", zkc.get().exists(segment1.getZkPath(), false)); @@ -258,7 +257,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { // create log segment 1 Transaction<Object> createTxn = lsmStore.transaction(); lsmStore.createLogSegment(createTxn, segment1, null); - FutureUtils.result(createTxn.execute()); + Utils.ioResult(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment1 + " should be created", zkc.get().exists(segment1.getZkPath(), false)); @@ -268,7 +267,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { lsmStore.deleteLogSegment(createDeleteTxn, segment2, null); lsmStore.createLogSegment(createDeleteTxn, segment3, null); try { - FutureUtils.result(createDeleteTxn.execute()); + Utils.ioResult(createDeleteTxn.execute()); fail("Should fail transaction if one operation failed"); } catch (Throwable t) { assertTrue("Transaction is aborted", @@ -290,12 +289,12 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(1L, 99L); Transaction<Object> createTxn = lsmStore.transaction(); lsmStore.createLogSegment(createTxn, segment, null); - FutureUtils.result(createTxn.execute()); + Utils.ioResult(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment + " should be created", zkc.get().exists(segment.getZkPath(), false)); LogSegmentMetadata readSegment = - FutureUtils.result(lsmStore.getLogSegment(segment.getZkPath())); + Utils.ioResult(lsmStore.getLogSegment(segment.getZkPath())); assertEquals("Log segment should match", segment, readSegment); } @@ -309,24 +308,24 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { createdSegments.add(segment); lsmStore.createLogSegment(createTxn, segment, null); } - FutureUtils.result(createTxn.execute()); + Utils.ioResult(createTxn.execute()); String rootPath = "/" + runtime.getMethodName(); List<String> children = zkc.get().getChildren(rootPath, false); Collections.sort(children); assertEquals("Should find 10 log segments", 10, children.size()); List<String> logSegmentNames = - FutureUtils.result(lsmStore.getLogSegmentNames(rootPath, null)).getValue(); + Utils.ioResult(lsmStore.getLogSegmentNames(rootPath, null)).getValue(); Collections.sort(logSegmentNames); assertEquals("Should find 10 log segments", 10, logSegmentNames.size()); assertEquals(children, logSegmentNames); - List<Future<LogSegmentMetadata>> getFutures = Lists.newArrayListWithExpectedSize(10); + List<CompletableFuture<LogSegmentMetadata>> getFutures = Lists.newArrayListWithExpectedSize(10); for (int i = 0; i < 10; i++) { getFutures.add(lsmStore.getLogSegment(rootPath + "/" + logSegmentNames.get(i))); } List<LogSegmentMetadata> segments = - FutureUtils.result(Future.collect(getFutures)); + Utils.ioResult(FutureUtils.collect(getFutures)); for (int i = 0; i < 10; i++) { assertEquals(createdSegments.get(i), segments.get(i)); } @@ -358,7 +357,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(i); lsmStore.createLogSegment(createTxn, segment, null); } - FutureUtils.result(createTxn.execute()); + Utils.ioResult(createTxn.execute()); String rootPath = "/" + runtime.getMethodName(); List<String> children = zkc.get().getChildren(rootPath, false); Collections.sort(children); @@ -399,7 +398,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(i); lsmStore.createLogSegment(anotherCreateTxn, segment, null); } - FutureUtils.result(anotherCreateTxn.execute()); + Utils.ioResult(anotherCreateTxn.execute()); List<String> newChildren = zkc.get().getChildren(rootPath, false); Collections.sort(newChildren); logger.info("All log segments become {}", newChildren); @@ -424,7 +423,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(i); lsmStore.createLogSegment(createTxn, segment, null); } - FutureUtils.result(createTxn.execute()); + Utils.ioResult(createTxn.execute()); String rootPath = "/" + runtime.getMethodName(); List<String> children = zkc.get().getChildren(rootPath, false); Collections.sort(children); @@ -464,7 +463,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(i); lsmStore.deleteLogSegment(deleteTxn, segment, null); } - FutureUtils.result(deleteTxn.execute()); + Utils.ioResult(deleteTxn.execute()); List<String> newChildren = zkc.get().getChildren(rootPath, false); Collections.sort(newChildren); while (numNotifications.get() < 2) { @@ -496,7 +495,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(i); lsmStore.createLogSegment(createTxn, segment, null); } - FutureUtils.result(createTxn.execute()); + Utils.ioResult(createTxn.execute()); String rootPath = "/" + runtime.getMethodName(); List<String> children = zkc.get().getChildren(rootPath, false); Collections.sort(children); @@ -541,7 +540,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(i); lsmStore.createLogSegment(anotherCreateTxn, segment, null); } - FutureUtils.result(anotherCreateTxn.execute()); + Utils.ioResult(anotherCreateTxn.execute()); List<String> newChildren = zkc.get().getChildren(rootPath, false); Collections.sort(newChildren); logger.info("All log segments become {}", newChildren); @@ -566,7 +565,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(i); lsmStore.createLogSegment(createTxn, segment, null); } - FutureUtils.result(createTxn.execute()); + Utils.ioResult(createTxn.execute()); String rootPath = "/" + runtime.getMethodName(); List<String> children = zkc.get().getChildren(rootPath, false); Collections.sort(children); @@ -607,7 +606,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment = createLogSegment(i); lsmStore.deleteLogSegment(deleteTxn, segment, null); } - FutureUtils.result(deleteTxn.execute()); + Utils.ioResult(deleteTxn.execute()); List<String> newChildren = zkc.get().getChildren(rootPath, false); Collections.sort(newChildren); while (numNotifications.get() < 2) { @@ -636,23 +635,23 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testStoreMaxLogSegmentSequenceNumber() throws Exception { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0)); - final Promise<Version> result = new Promise<Version>(); + final CompletableFuture<Version> result = new CompletableFuture<Version>(); LogMetadata metadata = mock(LogMetadata.class); when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath); lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { - result.setValue(r); + result.complete(r); } @Override public void onAbort(Throwable t) { - result.setException(t); + result.completeExceptionally(t); } }); - FutureUtils.result(updateTxn.execute()); - assertEquals(1, ((ZkVersion) FutureUtils.result(result)).getZnodeVersion()); + Utils.ioResult(updateTxn.execute()); + assertEquals(1, ((ZkVersion) Utils.ioResult(result)).getZnodeVersion()); Stat stat = new Stat(); byte[] data = zkc.get().getData(rootZkPath, false, stat); assertEquals(999L, DLUtils.deserializeLogSegmentSequenceNumber(data)); @@ -663,32 +662,32 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testStoreMaxLogSegmentSequenceNumberBadVersion() throws Exception { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); - final Promise<Version> result = new Promise<Version>(); + final CompletableFuture<Version> result = new CompletableFuture<Version>(); LogMetadata metadata = mock(LogMetadata.class); when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath); lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { - result.setValue(r); + result.complete(r); } @Override public void onAbort(Throwable t) { - result.setException(t); + result.completeExceptionally(t); } }); try { - FutureUtils.result(updateTxn.execute()); + Utils.ioResult(updateTxn.execute()); fail("Should fail on storing log segment sequence number if providing bad version"); } catch (ZKException zke) { assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode()); } try { - Await.result(result); + Utils.ioResult(result); fail("Should fail on storing log segment sequence number if providing bad version"); - } catch (KeeperException ke) { - assertEquals(KeeperException.Code.BADVERSION, ke.code()); + } catch (ZKException ze) { + assertEquals(KeeperException.Code.BADVERSION, ze.getKeeperExceptionCode()); } Stat stat = new Stat(); byte[] data = zkc.get().getData(rootZkPath, false, stat); @@ -700,7 +699,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testStoreMaxLogSegmentSequenceNumberOnNonExistentPath() throws Exception { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); - final Promise<Version> result = new Promise<Version>(); + final CompletableFuture<Version> result = new CompletableFuture<Version>(); String nonExistentPath = rootZkPath + "/non-existent"; LogMetadata metadata = mock(LogMetadata.class); when(metadata.getLogSegmentsPath()).thenReturn(nonExistentPath); @@ -708,25 +707,25 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { - result.setValue(r); + result.complete(r); } @Override public void onAbort(Throwable t) { - result.setException(t); + result.completeExceptionally(t); } }); try { - FutureUtils.result(updateTxn.execute()); + Utils.ioResult(updateTxn.execute()); fail("Should fail on storing log segment sequence number if path doesn't exist"); } catch (ZKException zke) { assertEquals(KeeperException.Code.NONODE, zke.getKeeperExceptionCode()); } try { - Await.result(result); + Utils.ioResult(result); fail("Should fail on storing log segment sequence number if path doesn't exist"); - } catch (KeeperException ke) { - assertEquals(KeeperException.Code.NONODE, ke.code()); + } catch (ZKException ke) { + assertEquals(KeeperException.Code.NONODE, ke.getKeeperExceptionCode()); } } @@ -734,23 +733,23 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testStoreMaxTxnId() throws Exception { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0)); - final Promise<Version> result = new Promise<Version>(); + final CompletableFuture<Version> result = new CompletableFuture<Version>(); LogMetadataForWriter metadata = mock(LogMetadataForWriter.class); when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath); lsmStore.storeMaxTxnId(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { - result.setValue(r); + result.complete(r); } @Override public void onAbort(Throwable t) { - result.setException(t); + result.completeExceptionally(t); } }); - FutureUtils.result(updateTxn.execute()); - assertEquals(1, ((ZkVersion) FutureUtils.result(result)).getZnodeVersion()); + Utils.ioResult(updateTxn.execute()); + assertEquals(1, ((ZkVersion) Utils.ioResult(result)).getZnodeVersion()); Stat stat = new Stat(); byte[] data = zkc.get().getData(rootZkPath, false, stat); assertEquals(999L, DLUtils.deserializeTransactionId(data)); @@ -761,32 +760,32 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testStoreMaxTxnIdBadVersion() throws Exception { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); - final Promise<Version> result = new Promise<Version>(); + final CompletableFuture<Version> result = new CompletableFuture<Version>(); LogMetadataForWriter metadata = mock(LogMetadataForWriter.class); when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath); lsmStore.storeMaxTxnId(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { - result.setValue(r); + result.complete(r); } @Override public void onAbort(Throwable t) { - result.setException(t); + result.completeExceptionally(t); } }); try { - FutureUtils.result(updateTxn.execute()); + Utils.ioResult(updateTxn.execute()); fail("Should fail on storing log record transaction id if providing bad version"); } catch (ZKException zke) { assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode()); } try { - Await.result(result); + Utils.ioResult(result); fail("Should fail on storing log record transaction id if providing bad version"); - } catch (KeeperException ke) { - assertEquals(KeeperException.Code.BADVERSION, ke.code()); + } catch (ZKException ze) { + assertEquals(KeeperException.Code.BADVERSION, ze.getKeeperExceptionCode()); } Stat stat = new Stat(); byte[] data = zkc.get().getData(rootZkPath, false, stat); @@ -798,7 +797,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testStoreMaxTxnIdOnNonExistentPath() throws Exception { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); - final Promise<Version> result = new Promise<Version>(); + final CompletableFuture<Version> result = new CompletableFuture<Version>(); String nonExistentPath = rootZkPath + "/non-existent"; LogMetadataForWriter metadata = mock(LogMetadataForWriter.class); when(metadata.getMaxTxIdPath()).thenReturn(nonExistentPath); @@ -806,25 +805,25 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { - result.setValue(r); + result.complete(r); } @Override public void onAbort(Throwable t) { - result.setException(t); + result.completeExceptionally(t); } }); try { - FutureUtils.result(updateTxn.execute()); + Utils.ioResult(updateTxn.execute()); fail("Should fail on storing log record transaction id if path doesn't exist"); } catch (ZKException zke) { assertEquals(KeeperException.Code.NONODE, zke.getKeeperExceptionCode()); } try { - Await.result(result); + Utils.ioResult(result); fail("Should fail on storing log record transaction id if path doesn't exist"); - } catch (KeeperException ke) { - assertEquals(KeeperException.Code.NONODE, ke.code()); + } catch (ZKException ze) { + assertEquals(KeeperException.Code.NONODE, ze.getKeeperExceptionCode()); } }
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java index 3c6e77c..4bd513b 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java @@ -24,7 +24,6 @@ import org.apache.distributedlog.TestZooKeeperClientBuilder; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.ZooKeeperClientUtils; import org.apache.distributedlog.callback.NamespaceListener; -import org.apache.distributedlog.util.DLUtils; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Utils; import org.apache.zookeeper.CreateMode; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java index 9c46d96..c81eb1d 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java @@ -25,16 +25,14 @@ import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.TestDistributedLogBase; import org.apache.distributedlog.TestZooKeeperClientBuilder; import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.ZooKeeperClientBuilder; import org.apache.distributedlog.ZooKeeperClientUtils; import org.apache.distributedlog.callback.NamespaceListener; import org.apache.distributedlog.exceptions.LogExistsException; import org.apache.distributedlog.exceptions.UnexpectedException; import org.apache.distributedlog.impl.BKNamespaceDriver; import org.apache.distributedlog.metadata.LogMetadataStore; -import org.apache.distributedlog.util.DLUtils; -import org.apache.distributedlog.util.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.Utils; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -153,7 +151,7 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { } private void deleteLog(String logName) throws Exception { - Optional<URI> logUriOptional = FutureUtils.result(metadataStore.getLogLocation(logName)); + Optional<URI> logUriOptional = Utils.ioResult(metadataStore.getLogLocation(logName)); assertTrue(logUriOptional.isPresent()); URI logUri = logUriOptional.get(); zkc.get().delete(logUri.getPath() + "/" + logName, -1); @@ -164,12 +162,12 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { TestNamespaceListener listener = new TestNamespaceListener(); metadataStore.registerNamespaceListener(listener); String logName = "test-log-1"; - URI logUri = FutureUtils.result(metadataStore.createLog(logName)); + URI logUri = Utils.ioResult(metadataStore.createLog(logName)); assertEquals(uri, logUri); - Optional<URI> logLocation = FutureUtils.result(metadataStore.getLogLocation(logName)); + Optional<URI> logLocation = Utils.ioResult(metadataStore.getLogLocation(logName)); assertTrue(logLocation.isPresent()); assertEquals(uri, logLocation.get()); - Optional<URI> notExistLogLocation = FutureUtils.result(metadataStore.getLogLocation("non-existent-log")); + Optional<URI> notExistLogLocation = Utils.ioResult(metadataStore.getLogLocation("non-existent-log")); assertFalse(notExistLogLocation.isPresent()); // listener should receive notification listener.waitForDone(); @@ -178,7 +176,7 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { assertEquals(logName, logsIter.next()); assertFalse(logsIter.hasNext()); // get logs should return the log - Iterator<String> newLogsIter = FutureUtils.result(metadataStore.getLogs()); + Iterator<String> newLogsIter = Utils.ioResult(metadataStore.getLogs()); assertTrue(newLogsIter.hasNext()); assertEquals(logName, newLogsIter.next()); assertFalse(newLogsIter.hasNext()); @@ -191,7 +189,7 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { metadataStore.registerNamespaceListener(listener1); metadataStore.registerNamespaceListener(listener2); String logName = "test-multiple-listeners"; - URI logUri = FutureUtils.result(metadataStore.createLog(logName)); + URI logUri = Utils.ioResult(metadataStore.createLog(logName)); assertEquals(uri, logUri); listener1.waitForDone(); listener2.waitForDone(); @@ -220,8 +218,8 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { checkStore = metadataStore; } String logName = "test-create-log-" + i; - URI logUri = FutureUtils.result(createStore.createLog(logName)); - Optional<URI> logLocation = FutureUtils.result(checkStore.getLogLocation(logName)); + URI logUri = Utils.ioResult(createStore.createLog(logName)); + Optional<URI> logLocation = Utils.ioResult(checkStore.getLogLocation(logName)); assertTrue("Log " + logName + " doesn't exist", logLocation.isPresent()); assertEquals("Different log location " + logLocation.get() + " is found", logUri, logLocation.get()); @@ -236,10 +234,10 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { conf.addConfiguration(baseConf); String logName = "test-log"; - FutureUtils.result(metadataStore.createLog(logName)); + Utils.ioResult(metadataStore.createLog(logName)); - URI subNs1 = FutureUtils.result(metadataStore.createSubNamespace()); - URI subNs2 = FutureUtils.result(metadataStore.createSubNamespace()); + URI subNs1 = Utils.ioResult(metadataStore.createSubNamespace()); + URI subNs2 = Utils.ioResult(metadataStore.createSubNamespace()); String duplicatedLogName = "test-duplicated-logs"; // Create same log in different sub namespaces @@ -247,35 +245,35 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { metadataStore.createLogInNamespaceSync(subNs2, duplicatedLogName); try { - FutureUtils.result(metadataStore.createLog("non-existent-log")); + Utils.ioResult(metadataStore.createLog("non-existent-log")); fail("should throw exception when duplicated log found"); } catch (UnexpectedException ue) { // should throw unexpected exception assertTrue(metadataStore.duplicatedLogFound.get()); } try { - FutureUtils.result(metadataStore.getLogLocation(logName)); + Utils.ioResult(metadataStore.getLogLocation(logName)); fail("should throw exception when duplicated log found"); } catch (UnexpectedException ue) { // should throw unexpected exception assertTrue(metadataStore.duplicatedLogFound.get()); } try { - FutureUtils.result(metadataStore.getLogLocation("non-existent-log")); + Utils.ioResult(metadataStore.getLogLocation("non-existent-log")); fail("should throw exception when duplicated log found"); } catch (UnexpectedException ue) { // should throw unexpected exception assertTrue(metadataStore.duplicatedLogFound.get()); } try { - FutureUtils.result(metadataStore.getLogLocation(duplicatedLogName)); + Utils.ioResult(metadataStore.getLogLocation(duplicatedLogName)); fail("should throw exception when duplicated log found"); } catch (UnexpectedException ue) { // should throw unexpected exception assertTrue(metadataStore.duplicatedLogFound.get()); } try { - FutureUtils.result(metadataStore.getLogs()); + Utils.ioResult(metadataStore.getLogs()); fail("should throw exception when duplicated log found"); } catch (UnexpectedException ue) { // should throw unexpected exception @@ -286,10 +284,10 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { @Test(timeout = 60000) public void testGetLogLocationWhenCacheMissed() throws Exception { String logName = "test-get-location-when-cache-missed"; - URI logUri = FutureUtils.result(metadataStore.createLog(logName)); + URI logUri = Utils.ioResult(metadataStore.createLog(logName)); assertEquals(uri, logUri); metadataStore.removeLogFromCache(logName); - Optional<URI> logLocation = FutureUtils.result(metadataStore.getLogLocation(logName)); + Optional<URI> logLocation = Utils.ioResult(metadataStore.getLogLocation(logName)); assertTrue(logLocation.isPresent()); assertEquals(logUri, logLocation.get()); } @@ -297,25 +295,25 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { @Test(timeout = 60000, expected = LogExistsException.class) public void testCreateLogWhenCacheMissed() throws Exception { String logName = "test-create-log-when-cache-missed"; - URI logUri = FutureUtils.result(metadataStore.createLog(logName)); + URI logUri = Utils.ioResult(metadataStore.createLog(logName)); assertEquals(uri, logUri); metadataStore.removeLogFromCache(logName); - FutureUtils.result(metadataStore.createLog(logName)); + Utils.ioResult(metadataStore.createLog(logName)); } @Test(timeout = 60000, expected = LogExistsException.class) public void testCreateLogWhenLogExists() throws Exception { String logName = "test-create-log-when-log-exists"; - URI logUri = FutureUtils.result(metadataStore.createLog(logName)); + URI logUri = Utils.ioResult(metadataStore.createLog(logName)); assertEquals(uri, logUri); - FutureUtils.result(metadataStore.createLog(logName)); + Utils.ioResult(metadataStore.createLog(logName)); } private Set<String> createLogs(int numLogs, String prefix) throws Exception { Set<String> expectedLogs = Sets.newTreeSet(); for (int i = 0; i < numLogs; i++) { String logName = prefix + i; - FutureUtils.result(metadataStore.createLog(logName)); + Utils.ioResult(metadataStore.createLog(logName)); expectedLogs.add(logName); } return expectedLogs; @@ -339,7 +337,7 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { do { TimeUnit.MILLISECONDS.sleep(20); receivedLogs = new TreeSet<String>(); - Iterator<String> logs = FutureUtils.result(metadataStore.getLogs()); + Iterator<String> logs = Utils.ioResult(metadataStore.getLogs()); receivedLogs.addAll(Lists.newArrayList(logs)); } while (receivedLogs.size() < numLogs); assertEquals(numLogs, receivedLogs.size()); @@ -372,8 +370,8 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { @Test(timeout = 60000) public void testCreateLogPickingFirstAvailableSubNamespace() throws Exception { - URI subNs1 = FutureUtils.result(metadataStore.createSubNamespace()); - URI subNs2 = FutureUtils.result(metadataStore.createSubNamespace()); + URI subNs1 = Utils.ioResult(metadataStore.createSubNamespace()); + URI subNs2 = Utils.ioResult(metadataStore.createSubNamespace()); Set<String> logs0 = createLogs(uri, maxLogsPerSubnamespace - 1, "test-ns0-"); Set<String> logs1 = createLogs(subNs1, maxLogsPerSubnamespace, "test-ns1-"); @@ -388,7 +386,7 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { do { TimeUnit.MILLISECONDS.sleep(20); receivedLogs = new TreeSet<String>(); - Iterator<String> logs = FutureUtils.result(metadataStore.getLogs()); + Iterator<String> logs = Utils.ioResult(metadataStore.getLogs()); receivedLogs.addAll(Lists.newArrayList(logs)); } while (receivedLogs.size() < 3 * maxLogsPerSubnamespace - 1); @@ -396,19 +394,19 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { new TestNamespaceListenerWithExpectedSize(3 * maxLogsPerSubnamespace + 1); metadataStore.registerNamespaceListener(listener); - Set<URI> uris = FutureUtils.result(metadataStore.fetchSubNamespaces(null)); + Set<URI> uris = Utils.ioResult(metadataStore.fetchSubNamespaces(null)); assertEquals(3, uris.size()); String testLogName = "test-pick-first-available-ns"; - URI createdURI = FutureUtils.result(metadataStore.createLog(testLogName)); + URI createdURI = Utils.ioResult(metadataStore.createLog(testLogName)); allLogs.add(testLogName); assertEquals(uri, createdURI); - uris = FutureUtils.result(metadataStore.fetchSubNamespaces(null)); + uris = Utils.ioResult(metadataStore.fetchSubNamespaces(null)); assertEquals(3, uris.size()); testLogName = "test-create-new-ns"; - URI newURI = FutureUtils.result(metadataStore.createLog(testLogName)); + URI newURI = Utils.ioResult(metadataStore.createLog(testLogName)); allLogs.add(testLogName); assertFalse(uris.contains(newURI)); - uris = FutureUtils.result(metadataStore.fetchSubNamespaces(null)); + uris = Utils.ioResult(metadataStore.fetchSubNamespaces(null)); assertEquals(4, uris.size()); listener.waitForDone(); @@ -435,7 +433,7 @@ public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase { .build(); FederatedZKLogMetadataStore anotherMetadataStore = new FederatedZKLogMetadataStore(anotherConf, uri, anotherZkc, scheduler); - FutureUtils.result(anotherMetadataStore.createLog(testLogName)); + Utils.ioResult(anotherMetadataStore.createLog(testLogName)); listener.waitForDone(); Set<String> receivedLogs = listener.getResult(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java index 5505259..a70edf5 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java @@ -18,13 +18,14 @@ package org.apache.distributedlog.impl.logsegment; import com.google.common.collect.Lists; -import org.apache.distributedlog.AsyncLogWriter; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.BookKeeperClient; import org.apache.distributedlog.BookKeeperClientBuilder; import org.apache.distributedlog.DLMTestUtil; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.Entry; import org.apache.distributedlog.LogRecord; import org.apache.distributedlog.LogRecordWithDLSN; @@ -37,10 +38,8 @@ import org.apache.distributedlog.exceptions.ReadCancelledException; import org.apache.distributedlog.injector.AsyncFailureInjector; import org.apache.distributedlog.logsegment.LogSegmentEntryStore; import org.apache.distributedlog.util.ConfUtils; -import org.apache.distributedlog.util.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Future; import org.apache.bookkeeper.stats.NullStatsLogger; import org.junit.After; import org.junit.Before; @@ -112,7 +111,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { null, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL); - return (BKLogSegmentEntryReader) FutureUtils.result(store.openReader(segment, startEntryId)); + return (BKLogSegmentEntryReader) Utils.ioResult(store.openReader(segment, startEntryId)); } void generateCompletedLogSegments(DistributedLogManager dlm, @@ -121,12 +120,12 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { long segmentSize) throws Exception { long txid = 1L; for (long i = 0; i < numCompletedSegments; i++) { - AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter()); for (long j = 1; j <= segmentSize; j++) { - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid); ctrlRecord.setControl(); - FutureUtils.result(writer.write(ctrlRecord)); + Utils.ioResult(writer.write(ctrlRecord)); } Utils.close(writer); } @@ -135,12 +134,12 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm, DistributedLogConfiguration conf, long segmentSize) throws Exception { - AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter()); for (long i = 1L; i <= segmentSize; i++) { - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i))); LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i); ctrlRecord.setControl(); - FutureUtils.result(writer.write(ctrlRecord)); + Utils.ioResult(writer.write(ctrlRecord)); } return writer; } @@ -168,7 +167,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { while (!done) { Entry.Reader entryReader; try { - entryReader = FutureUtils.result(reader.readNext(1)).get(0); + entryReader = Utils.ioResult(reader.readNext(1)).get(0); } catch (EndOfLogSegmentException eol) { done = true; continue; @@ -205,15 +204,15 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { 1, segments.size()); BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal); - List<Future<List<Entry.Reader>>> futures = Lists.newArrayList(); + List<CompletableFuture<List<Entry.Reader>>> futures = Lists.newArrayList(); for (int i = 0; i < 5; i++) { futures.add(reader.readNext(1)); } assertFalse("Reader should not be closed yet", reader.isClosed()); Utils.close(reader); - for (Future<List<Entry.Reader>> future : futures) { + for (CompletableFuture<List<Entry.Reader>> future : futures) { try { - FutureUtils.result(future); + Utils.ioResult(future); fail("The read request should be cancelled"); } catch (ReadCancelledException rce) { // expected @@ -253,7 +252,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { assertEquals(10, reader.getNextEntryId()); assertFalse(reader.hasCaughtUpOnInprogress()); // read first entry - Entry.Reader entryReader = FutureUtils.result(reader.readNext(1)).get(0); + Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0); LogRecordWithDLSN record = entryReader.nextRecord(); while (null != record) { if (!record.isControl()) { @@ -309,7 +308,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { assertEquals(5, reader.readAheadEntries.size()); assertEquals(5, reader.getNextEntryId()); // read first entry - Entry.Reader entryReader = FutureUtils.result(reader.readNext(1)).get(0); + Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0); LogRecordWithDLSN record = entryReader.nextRecord(); while (null != record) { if (!record.isControl()) { @@ -365,7 +364,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { assertEquals((reader.getLastAddConfirmed() + 1), reader.readAheadEntries.size()); assertEquals((reader.getLastAddConfirmed() + 1), reader.getNextEntryId()); // read first entry - Entry.Reader entryReader = FutureUtils.result(reader.readNext(1)).get(0); + Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0); LogRecordWithDLSN record = entryReader.nextRecord(); while (null != record) { if (!record.isControl()) { @@ -415,7 +414,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { long txId = 1L; long entryId = 0L; while (true) { - Entry.Reader entryReader = FutureUtils.result(reader.readNext(1)).get(0); + Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0); LogRecordWithDLSN record = entryReader.nextRecord(); while (null != record) { if (!record.isControl()) { @@ -435,11 +434,11 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { } assertEquals(6L, txId); - Future<List<Entry.Reader>> nextReadFuture = reader.readNext(1); + CompletableFuture<List<Entry.Reader>> nextReadFuture = reader.readNext(1); // write another record to commit previous writes - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId))); // the long poll will be satisfied - List<Entry.Reader> nextReadEntries = FutureUtils.result(nextReadFuture); + List<Entry.Reader> nextReadEntries = Utils.ioResult(nextReadFuture); assertEquals(1, nextReadEntries.size()); assertTrue(reader.hasCaughtUpOnInprogress()); Entry.Reader entryReader = nextReadEntries.get(0); @@ -486,7 +485,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { long txId = 1L; long entryId = 0L; while (true) { - Entry.Reader entryReader = FutureUtils.result(reader.readNext(1)).get(0); + Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0); LogRecordWithDLSN record = entryReader.nextRecord(); while (null != record) { if (!record.isControl()) { @@ -506,11 +505,11 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { } assertEquals(6L, txId); - Future<List<Entry.Reader>> nextReadFuture = reader.readNext(1); + CompletableFuture<List<Entry.Reader>> nextReadFuture = reader.readNext(1); // write another record to commit previous writes - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId))); // the long poll will be satisfied - List<Entry.Reader> nextReadEntries = FutureUtils.result(nextReadFuture); + List<Entry.Reader> nextReadEntries = Utils.ioResult(nextReadFuture); assertEquals(1, nextReadEntries.size()); Entry.Reader entryReader = nextReadEntries.get(0); LogRecordWithDLSN record = entryReader.nextRecord(); @@ -528,7 +527,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { ++entryId; // close the writer, the write will be committed Utils.close(writer); - entryReader = FutureUtils.result(reader.readNext(1)).get(0); + entryReader = Utils.ioResult(reader.readNext(1)).get(0); record = entryReader.nextRecord(); assertNotNull(record); assertFalse(record.isControl()); @@ -549,8 +548,8 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { try { // when we closed the log segment, another control record will be // written, so we loop over the reader until we reach end of log segment. - FutureUtils.result(reader.readNext(1)); - FutureUtils.result(reader.readNext(1)); + Utils.ioResult(reader.readNext(1)); + Utils.ioResult(reader.readNext(1)); fail("Should reach end of log segment"); } catch (EndOfLogSegmentException eol) { // expected http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java index f67de35..813501b 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java @@ -20,19 +20,17 @@ package org.apache.distributedlog.impl.metadata; import com.google.common.collect.Lists; import org.apache.distributedlog.DLMTestUtil; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.MetadataAccessor; +import org.apache.distributedlog.api.MetadataAccessor; import org.apache.distributedlog.TestZooKeeperClientBuilder; -import org.apache.distributedlog.impl.metadata.BKDLConfig; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.metadata.DLMetadata; import org.apache.distributedlog.metadata.LogMetadataForWriter; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.DistributedLogConstants; import org.apache.distributedlog.exceptions.LogNotFoundException; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.ZooKeeperClusterTestCase; import org.apache.distributedlog.util.DLUtils; -import org.apache.distributedlog.util.FutureUtils; import org.apache.distributedlog.util.Utils; import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.util.ZkUtils; @@ -129,7 +127,7 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase { public void testCheckLogMetadataPathsWithAllocator() throws Exception { String logRootPath = "/" + testName.getMethodName(); List<Versioned<byte[]>> metadatas = - FutureUtils.result(checkLogMetadataPaths( + Utils.ioResult(checkLogMetadataPaths( zkc.get(), logRootPath, true)); assertEquals("Should have 8 paths", 8, metadatas.size()); @@ -143,7 +141,7 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase { public void testCheckLogMetadataPathsWithoutAllocator() throws Exception { String logRootPath = "/" + testName.getMethodName(); List<Versioned<byte[]>> metadatas = - FutureUtils.result(checkLogMetadataPaths( + Utils.ioResult(checkLogMetadataPaths( zkc.get(), logRootPath, false)); assertEquals("Should have 7 paths", 7, metadatas.size()); @@ -169,12 +167,12 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase { } LogMetadataForWriter logMetadata = - FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, ownAllocator, true)); + Utils.ioResult(getLog(uri, logName, logIdentifier, zkc, ownAllocator, true)); final String logRootPath = getLogRootPath(uri, logName, logIdentifier); List<Versioned<byte[]>> metadatas = - FutureUtils.result(checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator)); + Utils.ioResult(checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator)); if (ownAllocator) { assertEquals("Should have 8 paths : ownAllocator = " + ownAllocator, @@ -301,7 +299,7 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase { public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws Exception { String logName = testName.getMethodName(); String logIdentifier = "<default>"; - FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, true, false)); + Utils.ioResult(getLog(uri, logName, logIdentifier, zkc, true, false)); } @Test(timeout = 60000) @@ -312,7 +310,7 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase { DLMetadata.create(new BKDLConfig(zkServers, "/ledgers")).update(uri); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(new DistributedLogConfiguration()) .uri(uri) .build(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java b/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java index b2eee34..26cf979 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java @@ -17,20 +17,18 @@ */ package org.apache.distributedlog.lock; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.exceptions.UnexpectedException; import org.apache.distributedlog.util.FailpointUtils; import org.apache.distributedlog.exceptions.LockingException; import org.apache.distributedlog.TestDistributedLogBase; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureEventListener; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Utils; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.ZooKeeperClientBuilder; import org.apache.distributedlog.ZooKeeperClientUtils; import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.commons.lang3.tuple.Pair; import org.apache.zookeeper.CreateMode; @@ -192,9 +190,9 @@ public class TestDistributedLock extends TestDistributedLogBase { private static void checkLockAndReacquire(ZKDistributedLock lock, boolean sync) throws Exception { lock.checkOwnershipAndReacquire(); - Future<ZKDistributedLock> reacquireFuture = lock.getLockReacquireFuture(); + CompletableFuture<ZKDistributedLock> reacquireFuture = lock.getLockReacquireFuture(); if (null != reacquireFuture && sync) { - FutureUtils.result(reacquireFuture); + Utils.ioResult(reacquireFuture); } } @@ -212,7 +210,7 @@ public class TestDistributedLock extends TestDistributedLogBase { try { ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); - FutureUtils.result(lock.asyncAcquire()); + Utils.ioResult(lock.asyncAcquire()); fail("Should fail on creating lock if couldn't establishing connections to zookeeper"); } catch (IOException ioe) { // expected. @@ -228,7 +226,7 @@ public class TestDistributedLock extends TestDistributedLogBase { try { ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); - FutureUtils.result(lock.asyncAcquire()); + Utils.ioResult(lock.asyncAcquire()); fail("Should fail on creating lock if couldn't establishing connections to zookeeper after 3 retries"); } catch (IOException ioe) { // expected. @@ -243,14 +241,14 @@ public class TestDistributedLock extends TestDistributedLogBase { try { ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); - FutureUtils.result(lock.asyncAcquire()); + Utils.ioResult(lock.asyncAcquire()); Pair<String, Long> lockId1 = ((ZKSessionLock) lock.getInternalLock()).getLockId(); List<String> children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); assertTrue(lock.haveLock()); - assertEquals(lockId1, Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lockId1, Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); lock.asyncClose(); } finally { @@ -268,16 +266,16 @@ public class TestDistributedLock extends TestDistributedLogBase { SessionLockFactory lockFactory = createLockFactory(clientId, zkc); ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); - FutureUtils.result(lock.asyncAcquire()); + Utils.ioResult(lock.asyncAcquire()); Pair<String, Long> lockId1 = ((ZKSessionLock) lock.getInternalLock()).getLockId(); List<String> children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); assertTrue(lock.haveLock()); - assertEquals(lockId1, Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lockId1, Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); - FutureUtils.result(lock.asyncClose()); + Utils.ioResult(lock.asyncClose()); children = getLockWaiters(zkc, lockPath); assertEquals(0, children.size()); @@ -285,25 +283,25 @@ public class TestDistributedLock extends TestDistributedLogBase { lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); - FutureUtils.result(lock.asyncAcquire()); + Utils.ioResult(lock.asyncAcquire()); Pair<String, Long> lockId2 = ((ZKSessionLock) lock.getInternalLock()).getLockId(); children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); assertTrue(lock.haveLock()); - assertEquals(lockId2, Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lockId2, Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); assertEquals(lockId1, lockId2); - FutureUtils.result(lock.asyncClose()); + Utils.ioResult(lock.asyncClose()); children = getLockWaiters(zkc, lockPath); assertEquals(0, children.size()); assertFalse(lock.haveLock()); try { - FutureUtils.result(lock.asyncAcquire()); + Utils.ioResult(lock.asyncAcquire()); fail("Should fail on acquiring a closed lock"); } catch (UnexpectedException le) { // expected. @@ -324,7 +322,7 @@ public class TestDistributedLock extends TestDistributedLogBase { ZKDistributedLock lock0 = new ZKDistributedLock(lockStateExecutor, lockFactory0, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); - FutureUtils.result(lock0.asyncAcquire()); + Utils.ioResult(lock0.asyncAcquire()); Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId(); @@ -332,7 +330,7 @@ public class TestDistributedLock extends TestDistributedLogBase { assertEquals(1, children.size()); assertTrue(lock0.haveLock()); assertEquals(lockId0_1, - Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); // expire the session ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs); @@ -347,7 +345,7 @@ public class TestDistributedLock extends TestDistributedLogBase { assertEquals(1, children.size()); assertTrue(lock0.haveLock()); assertEquals(lockId0_2, - Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); SessionLockFactory lockFactory = createLockFactory(clientId, zkc); @@ -359,9 +357,9 @@ public class TestDistributedLock extends TestDistributedLogBase { @Override public void run() { try { - FutureUtils.result(lock1.asyncAcquire()); + Utils.ioResult(lock1.asyncAcquire()); lockLatch.countDown(); - } catch (IOException e) { + } catch (Exception e) { logger.error("Failed on locking lock1 : ", e); } } @@ -424,7 +422,7 @@ public class TestDistributedLock extends TestDistributedLogBase { ZKDistributedLock lock0 = new ZKDistributedLock(lockStateExecutor, lockFactory0, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); - FutureUtils.result(lock0.asyncAcquire()); + Utils.ioResult(lock0.asyncAcquire()); Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId(); @@ -432,7 +430,7 @@ public class TestDistributedLock extends TestDistributedLogBase { assertEquals(1, children.size()); assertTrue(lock0.haveLock()); assertEquals(lockId0_1, - Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs); @@ -441,13 +439,13 @@ public class TestDistributedLock extends TestDistributedLogBase { checkLockAndReacquire(lock0, false); } else { // session expire will trigger lock re-acquisition - Future<ZKDistributedLock> asyncLockAcquireFuture; + CompletableFuture<ZKDistributedLock> asyncLockAcquireFuture; do { Thread.sleep(1); asyncLockAcquireFuture = lock0.getLockReacquireFuture(); } while (null == asyncLockAcquireFuture && lock0.getReacquireCount() < 1); if (null != asyncLockAcquireFuture) { - Await.result(asyncLockAcquireFuture); + Utils.ioResult(asyncLockAcquireFuture); } checkLockAndReacquire(lock0, false); } @@ -456,11 +454,11 @@ public class TestDistributedLock extends TestDistributedLogBase { assertTrue(lock0.haveLock()); Pair<String, Long> lock0_2 = ((ZKSessionLock) lock0.getInternalLock()).getLockId(); assertEquals(lock0_2, - Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); + Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); assertEquals(clientId, lock0_2.getLeft()); assertFalse(lockId0_1.equals(lock0_2)); - FutureUtils.result(lock0.asyncClose()); + Utils.ioResult(lock0.asyncClose()); children = getLockWaiters(zkc, lockPath); assertEquals(0, children.size()); @@ -495,7 +493,7 @@ public class TestDistributedLock extends TestDistributedLogBase { ZKDistributedLock lock0 = new ZKDistributedLock(lockStateExecutor, lockFactory0, lockPath, Long.MAX_VALUE, NullStatsLogger.INSTANCE); - FutureUtils.result(lock0.asyncAcquire()); + Utils.ioResult(lock0.asyncAcquire()); final CountDownLatch lock1DoneLatch = new CountDownLatch(1); SessionLockFactory lockFactory1 = createLockFactory(clientId, zkc); @@ -506,9 +504,9 @@ public class TestDistributedLock extends TestDistributedLogBase { @Override public void run() { try { - FutureUtils.result(lock1.asyncAcquire()); + Utils.ioResult(lock1.asyncAcquire()); lock1DoneLatch.countDown(); - } catch (IOException e) { + } catch (Exception e) { logger.error("Error on acquiring lock1 : ", e); } } @@ -524,9 +522,9 @@ public class TestDistributedLock extends TestDistributedLogBase { assertTrue(lock0.haveLock()); assertFalse(lock1.haveLock()); assertEquals(((ZKSessionLock) lock0.getInternalLock()).getLockId(), - Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId(), - Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); + Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); logger.info("Expiring session on lock0"); ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs); @@ -553,14 +551,14 @@ public class TestDistributedLock extends TestDistributedLogBase { } else { logger.info("Waiting lock0 to attempt acquisition after session expired"); // session expire will trigger lock re-acquisition - Future<ZKDistributedLock> asyncLockAcquireFuture; + CompletableFuture<ZKDistributedLock> asyncLockAcquireFuture; do { Thread.sleep(1); asyncLockAcquireFuture = lock0.getLockReacquireFuture(); } while (null == asyncLockAcquireFuture); try { - Await.result(asyncLockAcquireFuture); + Utils.ioResult(asyncLockAcquireFuture); fail("Should fail check write lock since lock is already held by other people"); } catch (OwnershipAcquireFailedException oafe) { assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(), @@ -579,10 +577,10 @@ public class TestDistributedLock extends TestDistributedLogBase { assertFalse(lock0.haveLock()); assertTrue(lock1.haveLock()); assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId(), - Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); + Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); - FutureUtils.result(lock0.asyncClose()); - FutureUtils.result(lock1.asyncClose()); + Utils.ioResult(lock0.asyncClose()); + Utils.ioResult(lock1.asyncClose()); children = getLockWaiters(zkc, lockPath); assertEquals(0, children.size()); @@ -597,7 +595,7 @@ public class TestDistributedLock extends TestDistributedLogBase { SessionLockFactory lockFactory = createLockFactory(clientId, zkc, conf.getLockTimeoutMilliSeconds(), 0); ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath, conf.getLockTimeoutMilliSeconds(), NullStatsLogger.INSTANCE); - FutureUtils.result(lock.asyncAcquire()); + Utils.ioResult(lock.asyncAcquire()); // try and cleanup the underlying lock lock.getInternalLock().unlock(); @@ -614,14 +612,14 @@ public class TestDistributedLock extends TestDistributedLogBase { boolean exceptionEncountered = false; try { - FutureUtils.result(lock2.asyncAcquire()); + Utils.ioResult(lock2.asyncAcquire()); } catch (OwnershipAcquireFailedException exc) { assertEquals(clientId, exc.getCurrentOwner()); exceptionEncountered = true; } assertTrue(exceptionEncountered); - FutureUtils.result(lock.asyncClose()); - FutureUtils.result(lock2.asyncClose()); + Utils.ioResult(lock.asyncClose()); + Utils.ioResult(lock2.asyncClose()); } @Test(timeout = 60000) @@ -633,7 +631,7 @@ public class TestDistributedLock extends TestDistributedLogBase { SessionLockFactory factory = createLockFactory(clientId, zkc, conf.getLockTimeoutMilliSeconds(), 0); ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, factory, lockPath, conf.getLockTimeoutMilliSeconds(), NullStatsLogger.INSTANCE); - FutureUtils.result(lock.asyncAcquire()); + Utils.ioResult(lock.asyncAcquire()); // try and cleanup the underlying lock lock.getInternalLock().unlock(); @@ -650,15 +648,15 @@ public class TestDistributedLock extends TestDistributedLogBase { boolean exceptionEncountered = false; try { - FutureUtils.result(lock2.asyncAcquire()); + Utils.ioResult(lock2.asyncAcquire()); } catch (OwnershipAcquireFailedException exc) { assertEquals(clientId, exc.getCurrentOwner()); exceptionEncountered = true; } assertTrue(exceptionEncountered); - FutureUtils.result(lock2.asyncClose()); + Utils.ioResult(lock2.asyncClose()); - FutureUtils.result(lock.asyncClose()); + Utils.ioResult(lock.asyncClose()); assertEquals(false, lock.haveLock()); assertEquals(false, lock.getInternalLock().isLockHeld()); @@ -666,10 +664,10 @@ public class TestDistributedLock extends TestDistributedLogBase { ZKDistributedLock lock3 = new ZKDistributedLock(lockStateExecutor, factory, lockPath, 0, NullStatsLogger.INSTANCE); - FutureUtils.result(lock3.asyncAcquire()); + Utils.ioResult(lock3.asyncAcquire()); assertEquals(true, lock3.haveLock()); assertEquals(true, lock3.getInternalLock().isLockHeld()); - FutureUtils.result(lock3.asyncClose()); + Utils.ioResult(lock3.asyncClose()); } void assertLatchesSet(CountDownLatch[] latches, int endIndex) { @@ -697,8 +695,8 @@ public class TestDistributedLock extends TestDistributedLogBase { TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), zkc, lockStateExecutor); int count = 3; - ArrayList<Future<ZKDistributedLock>> results = - new ArrayList<Future<ZKDistributedLock>>(count); + ArrayList<CompletableFuture<ZKDistributedLock>> results = + new ArrayList<CompletableFuture<ZKDistributedLock>>(count); ZKDistributedLock[] lockArray = new ZKDistributedLock[count]; final CountDownLatch[] latches = new CountDownLatch[count]; @@ -708,7 +706,7 @@ public class TestDistributedLock extends TestDistributedLogBase { latches[i] = new CountDownLatch(1); lockArray[i] = locks.createLock(i, zkc); final int index = i; - results.add(lockArray[i].asyncAcquire().addEventListener( + results.add(lockArray[i].asyncAcquire().whenComplete( new FutureEventListener<ZKDistributedLock>() { @Override public void onSuccess(ZKDistributedLock lock) { @@ -727,8 +725,8 @@ public class TestDistributedLock extends TestDistributedLogBase { for (int i = 0; i < count; i++) { latches[i].await(); assertLatchesSet(latches, i+1); - Await.result(results.get(i)); - FutureUtils.result(lockArray[i].asyncClose()); + Utils.ioResult(results.get(i)); + Utils.ioResult(lockArray[i].asyncClose()); } } @@ -738,7 +736,7 @@ public class TestDistributedLock extends TestDistributedLogBase { final ZKDistributedLock lock0 = locks.createLock(0, zkc); final ZKDistributedLock lock1 = locks.createLock(1, zkc0); - FutureUtils.result(lock0.asyncAcquire()); + Utils.ioResult(lock0.asyncAcquire()); // Initial state. assertLockState(lock0, true, true, lock1, false, false, 1, locks.getLockPath()); @@ -747,8 +745,8 @@ public class TestDistributedLock extends TestDistributedLogBase { @Override public void run() { try { - FutureUtils.result(lock1.asyncAcquire()); - } catch (IOException e) { + Utils.ioResult(lock1.asyncAcquire()); + } catch (Exception e) { fail("shouldn't fail to acquire"); } } @@ -761,13 +759,13 @@ public class TestDistributedLock extends TestDistributedLogBase { } assertLockState(lock0, true, true, lock1, false, false, 2, locks.getLockPath()); - FutureUtils.result(lock0.asyncClose()); - Await.result(lock1.getLockAcquireFuture()); + Utils.ioResult(lock0.asyncClose()); + Utils.ioResult(lock1.getLockAcquireFuture()); assertLockState(lock0, false, false, lock1, true, true, 1, locks.getLockPath()); // Release lock1 - FutureUtils.result(lock1.asyncClose()); + Utils.ioResult(lock1.asyncClose()); assertLockState(lock0, false, false, lock1, false, false, 0, locks.getLockPath()); } @@ -777,8 +775,8 @@ public class TestDistributedLock extends TestDistributedLogBase { final ZKDistributedLock lock0 = locks.createLock(0, zkc); final ZKDistributedLock lock1 = locks.createLock(1, zkc0); - FutureUtils.result(lock0.asyncAcquire()); - Future<ZKDistributedLock> result = lock1.asyncAcquire(); + Utils.ioResult(lock0.asyncAcquire()); + CompletableFuture<ZKDistributedLock> result = lock1.asyncAcquire(); // make sure we place a waiter for lock1 while (null == lock1.getLockWaiter()) { TimeUnit.MILLISECONDS.sleep(20); @@ -787,7 +785,7 @@ public class TestDistributedLock extends TestDistributedLogBase { // Expire causes acquire future to be failed and unset. ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs); try { - Await.result(result); + Utils.ioResult(result); fail("future should have been failed"); } catch (OwnershipAcquireFailedException ex) { } @@ -803,11 +801,11 @@ public class TestDistributedLock extends TestDistributedLogBase { final ZKDistributedLock lock0 = locks.createLock(0, zkc); final ZKDistributedLock lock1 = locks.createLock(1, zkc0); - FutureUtils.result(lock0.asyncAcquire()); - Future<ZKDistributedLock> result = lock1.asyncAcquire(); - FutureUtils.result(lock1.asyncClose()); + Utils.ioResult(lock0.asyncAcquire()); + CompletableFuture<ZKDistributedLock> result = lock1.asyncAcquire(); + Utils.ioResult(lock1.asyncClose()); try { - Await.result(result); + Utils.ioResult(result); fail("future should have been failed"); } catch (LockClosedException ex) { } @@ -821,12 +819,12 @@ public class TestDistributedLock extends TestDistributedLogBase { TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), zkc, lockStateExecutor); final ZKDistributedLock lock0 = locks.createLock(0, zkc); - Future<ZKDistributedLock> result = lock0.asyncAcquire(); - Await.result(result); - FutureUtils.result(lock0.asyncClose()); + CompletableFuture<ZKDistributedLock> result = lock0.asyncAcquire(); + Utils.ioResult(result); + Utils.ioResult(lock0.asyncClose()); // Already have this, stays satisfied. - Await.result(result); + Utils.ioResult(result); // But we no longer have the lock. assertEquals(false, lock0.haveLock());