http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java index 2f5766d..5123178 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java @@ -22,6 +22,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -30,16 +31,22 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Optional; import com.google.common.collect.Lists; -import org.apache.distributedlog.annotations.DistributedLogAnnotations; -import org.apache.distributedlog.config.ConcurrentBaseConfiguration; -import org.apache.distributedlog.config.ConcurrentConstConfiguration; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; +import org.apache.distributedlog.api.LogWriter; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.common.annotations.DistributedLogAnnotations; +import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration; +import org.apache.distributedlog.common.config.ConcurrentConstConfiguration; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.exceptions.BKTransmitException; import org.apache.distributedlog.exceptions.LockingException; import org.apache.distributedlog.impl.BKNamespaceDriver; import org.apache.distributedlog.io.CompressionCodec; +import org.apache.distributedlog.common.concurrent.FutureEventListener; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Promise; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperAccessor; import org.apache.bookkeeper.client.LedgerHandle; @@ -63,15 +70,10 @@ import org.apache.distributedlog.exceptions.OverCapacityException; import org.apache.distributedlog.exceptions.ReadCancelledException; import org.apache.distributedlog.exceptions.WriteException; import org.apache.distributedlog.lock.DistributedLock; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.util.FailpointUtils; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.SimplePermitLimiter; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; import junit.framework.Assert; import static com.google.common.base.Charsets.UTF_8; @@ -111,13 +113,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { for (long i = 0; i < 3; i++) { final long currentLogSegmentSeqNo = i + 1; BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); - DLSN dlsn = Await.result(writer.writeControlRecord(new LogRecord(txid++, "control".getBytes(UTF_8)))); + DLSN dlsn = Utils.ioResult(writer.writeControlRecord(new LogRecord(txid++, "control".getBytes(UTF_8)))); assertEquals(currentLogSegmentSeqNo, dlsn.getLogSegmentSequenceNo()); assertEquals(0, dlsn.getEntryId()); assertEquals(0, dlsn.getSlotId()); for (long j = 1; j < 10; j++) { final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); - Await.result(writer.write(record)); + Utils.ioResult(writer.write(record)); } writer.closeAndComplete(); } @@ -161,8 +163,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { // Write one record larger than max seg size. Ledger doesn't roll until next write. int txid = 1; LogRecord record = DLMTestUtil.getLogRecordInstance(txid++, 2048); - Future<DLSN> result = writer.write(record); - DLSN dlsn = Await.result(result, Duration.fromSeconds(10)); + CompletableFuture<DLSN> result = writer.write(record); + DLSN dlsn = Utils.ioResult(result, 10, TimeUnit.SECONDS); assertEquals(1, dlsn.getLogSegmentSequenceNo()); record = DLMTestUtil.getLogRecordInstance(txid++, MAX_LOGRECORD_SIZE + 1); @@ -207,8 +209,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { for (long j = 0; j < numRecordsPerLogSegment; j++) { final long currentEntryId = j; final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); - Future<DLSN> dlsnFuture = writer.write(record); - dlsnFuture.addEventListener(new FutureEventListener<DLSN>() { + CompletableFuture<DLSN> dlsnFuture = writer.write(record); + dlsnFuture.whenComplete(new FutureEventListener<DLSN>() { @Override public void onSuccess(DLSN value) { if(value.getLogSegmentSequenceNo() != currentLogSegmentSeqNo) { @@ -245,7 +247,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { assertEquals("Last DLSN" + last.getDlsn() + " isn't the maximum DLSN " + maxDLSN.get(), last.getDlsn(), maxDLSN.get()); assertEquals(last.getDlsn(), dlm.getLastDLSN()); - assertEquals(last.getDlsn(), Await.result(dlm.getLastDLSNAsync())); + assertEquals(last.getDlsn(), Utils.ioResult(dlm.getLastDLSNAsync())); DLMTestUtil.verifyLargeLogRecord(last); dlm.close(); @@ -330,8 +332,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { final CountDownLatch syncLatch, final CountDownLatch completionLatch, final AtomicBoolean errorsFound) { - Future<LogRecordWithDLSN> record = reader.readNext(); - record.addEventListener(new FutureEventListener<LogRecordWithDLSN>() { + CompletableFuture<LogRecordWithDLSN> record = reader.readNext(); + record.whenComplete(new FutureEventListener<LogRecordWithDLSN>() { @Override public void onSuccess(LogRecordWithDLSN value) { try { @@ -455,7 +457,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { if (expectedTxID == numLogSegments * numRecordsPerLogSegment) { break; } - List<LogRecordWithDLSN> records = Await.result(reader.readBulk(20)); + List<LogRecordWithDLSN> records = Utils.ioResult(reader.readBulk(20)); LOG.info("Bulk read {} entries.", records.size()); assertTrue(records.size() >= 1); @@ -495,7 +497,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { for (long i = 0; i < 3; i++) { // since we batched 20 entries into single bookkeeper entry // we should be able to read 20 entries as a batch. - List<LogRecordWithDLSN> records = Await.result(reader.readBulk(20)); + List<LogRecordWithDLSN> records = Utils.ioResult(reader.readBulk(20)); assertEquals(20, records.size()); for (LogRecordWithDLSN record : records) { assertEquals(expectedTxID, record.getTransactionId()); @@ -531,7 +533,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { name, asyncReader.getStreamName()); long numTrans = 0; DLSN lastDLSN = DLSN.InvalidDLSN; - LogRecordWithDLSN record = Await.result(asyncReader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(asyncReader.readNext()); while (null != record) { DLMTestUtil.verifyEmptyLogRecord(record); assertEquals(0, record.getDlsn().getSlotId()); @@ -541,7 +543,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { if (numTrans >= (txid - 1)) { break; } - record = Await.result(asyncReader.readNext()); + record = Utils.ioResult(asyncReader.readNext()); } assertEquals((txid - 1), numTrans); Utils.close(asyncReader); @@ -715,8 +717,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { for (long j = 0; j < 10; j++) { final long currentEntryId = j; final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); - Future<DLSN> dlsnFuture = writer.write(record); - dlsnFuture.addEventListener(new WriteFutureEventListener( + CompletableFuture<DLSN> dlsnFuture = writer.write(record); + dlsnFuture.whenComplete(new WriteFutureEventListener( record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true)); if (i == 0 && j == 0) { boolean monotonic = LogSegmentMetadata.supportsSequenceId(logSegmentVersion); @@ -793,8 +795,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { for (long j = 0; j < 10; j++) { final long currentEntryId = j; final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); - Future<DLSN> dlsnFuture = writer.write(record); - dlsnFuture.addEventListener(new WriteFutureEventListener( + CompletableFuture<DLSN> dlsnFuture = writer.write(record); + dlsnFuture.whenComplete(new WriteFutureEventListener( record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true)); } writer.closeAndComplete(); @@ -835,7 +837,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { URI uri = createDLMURI("/" + name); ensureURICreated(uri); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(confLocal).uri(uri).build(); final DistributedLogManager[] dlms = new DistributedLogManager[count]; final TestReader[] readers = new TestReader[count]; @@ -868,8 +870,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { final long currentEntryId = j; final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); for (int s = 0; s < count; s++) { - Future<DLSN> dlsnFuture = writers[s].write(record); - dlsnFuture.addEventListener(new WriteFutureEventListener( + CompletableFuture<DLSN> dlsnFuture = writers[s].write(record); + dlsnFuture.whenComplete(new WriteFutureEventListener( record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true)); } } @@ -937,8 +939,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { for (long j = 0; j < numRecordsPerLogSegment; j++) { final long currentEntryId = j; final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); - Future<DLSN> dlsnFuture = writer.write(record); - dlsnFuture.addEventListener(new WriteFutureEventListener( + CompletableFuture<DLSN> dlsnFuture = writer.write(record); + dlsnFuture.whenComplete(new WriteFutureEventListener( record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true)); } writer.closeAndComplete(); @@ -988,8 +990,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { for (long j = 0; j < numRecordsPerLogSegment; j++) { Thread.sleep(50); final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); - Future<DLSN> dlsnFuture = writer.write(record); - dlsnFuture.addEventListener(new WriteFutureEventListener( + CompletableFuture<DLSN> dlsnFuture = writer.write(record); + dlsnFuture.whenComplete(new WriteFutureEventListener( record, currentLogSegmentSeqNo, j, writeLatch, writeErrors, false)); if (i == 0 && j == 0) { boolean monotonic = LogSegmentMetadata.supportsSequenceId(confLocal.getDLLedgerMetadataLayoutVersion()); @@ -1027,7 +1029,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { writer.closeAndComplete(); final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN); - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); assertEquals(1L, record.getTransactionId()); DLMTestUtil.verifyLogRecord(record); @@ -1037,7 +1039,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { @Override public void run() { try { - Await.result(reader.readNext()); + Utils.ioResult(reader.readNext()); } catch (ReadCancelledException rce) { receiveExpectedException.set(true); } catch (Throwable t) { @@ -1060,7 +1062,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { // closed reader should reject any readNext try { - Await.result(reader.readNext()); + Utils.ioResult(reader.readNext()); fail("Reader should reject readNext if it is closed."); } catch (ReadCancelledException rce) { // expected @@ -1087,8 +1089,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { for (long i = 0; i < COUNT; i++) { Thread.sleep(1); final LogRecord record = DLMTestUtil.getLogRecordInstance(txid++); - Future<DLSN> dlsnFuture = writer.write(record); - dlsnFuture.addEventListener(new FutureEventListener<DLSN>() { + CompletableFuture<DLSN> dlsnFuture = writer.write(record); + dlsnFuture.whenComplete(new FutureEventListener<DLSN>() { @Override public void onSuccess(DLSN value) { syncLatch.countDown(); @@ -1142,10 +1144,10 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { URI uri = createDLMURI("/" + name); ensureURICreated(uri); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(confLocal).uri(uri).clientId("gabbagoo").build(); DistributedLogManager dlm = namespace.openLog(name); - DistributedLogNamespace namespace1 = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace1 = NamespaceBuilder.newBuilder() .conf(confLocal).uri(uri).clientId("tortellini").build(); DistributedLogManager dlm1 = namespace1.openLog(name); @@ -1153,12 +1155,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); // First write succeeds since lock isnt checked until transmit, which is scheduled - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); writer.flushAndCommit(); BKLogSegmentWriter perStreamWriter = writer.getCachedLogWriter(); DistributedLock lock = perStreamWriter.getLock(); - FutureUtils.result(lock.asyncClose()); + Utils.ioResult(lock.asyncClose()); // Get second writer, steal lock BKAsyncLogWriter writer2 = (BKAsyncLogWriter)(dlm1.startAsyncLogSegmentNonPartitioned()); @@ -1169,7 +1171,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { // Succeeds, kicks off scheduled flush Thread.sleep(100); - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); fail("should have thrown"); } catch (LockingException ex) { LOG.debug("caught exception ", ex); @@ -1194,13 +1196,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { dlm = createNewDLM(confLocal, runtime.getMethodName()); } BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); - ArrayList<Future<DLSN>> results = new ArrayList<Future<DLSN>>(1000); + ArrayList<CompletableFuture<DLSN>> results = new ArrayList<CompletableFuture<DLSN>>(1000); for (int i = 0; i < 1000; i++) { results.add(writer.write(DLMTestUtil.getLogRecordInstance(1L))); } - for (Future<DLSN> result : results) { + for (CompletableFuture<DLSN> result : results) { try { - Await.result(result); + Utils.ioResult(result); if (shouldFail) { fail("should fail due to no outstanding writes permitted"); } @@ -1242,12 +1244,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { confLocal.setOutstandingWriteLimitDarkmode(true); DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName()); BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); - ArrayList<Future<DLSN>> results = new ArrayList<Future<DLSN>>(1000); + ArrayList<CompletableFuture<DLSN>> results = new ArrayList<CompletableFuture<DLSN>>(1000); for (int i = 0; i < 1000; i++) { results.add(writer.write(DLMTestUtil.getLogRecordInstance(1L))); } - for (Future<DLSN> result : results) { - Await.result(result); + for (CompletableFuture<DLSN> result : results) { + Utils.ioResult(result); } writer.closeAndComplete(); dlm.close(); @@ -1266,7 +1268,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { long txId = 1L; for (int i = 0; i < 5; i++) { - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); } BKLogSegmentWriter logWriter = writer.getCachedLogWriter(); @@ -1277,7 +1279,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8)); try { - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); fail("Should fail write to a fenced ledger with BKTransmitException"); } catch (BKTransmitException bkte) { // expected @@ -1310,7 +1312,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { long txId = 1L; for (int i = 0; i < 5; i++) { - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); } BKLogSegmentWriter logWriter = writer.getCachedLogWriter(); @@ -1408,8 +1410,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { int recordCount = 0; try { while (true) { - Future<LogRecordWithDLSN> record = reader.readNext(); - Await.result(record); + CompletableFuture<LogRecordWithDLSN> record = reader.readNext(); + Utils.ioResult(record); recordCount++; if (recordCount >= segmentSize * numSegments) { @@ -1465,7 +1467,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L))); writer.abort(); for (int i = 0; i < 2; i++) { @@ -1548,8 +1550,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { int recordCount = 0; try { while (true) { - Future<LogRecordWithDLSN> record = reader.readNext(); - Await.result(record); + CompletableFuture<LogRecordWithDLSN> record = reader.readNext(); + Utils.ioResult(record); if (recordCount == 0) { readLatch.countDown(); } @@ -1582,7 +1584,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { int numRecords = 10; for (int i = 0; i < numRecords; i++) { - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i))); assertEquals("last tx id should become " + i, i, writer.getLastTxId()); } @@ -1612,16 +1614,16 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { int numRecords = 40; for (int i = 1; i <= numRecords; i++) { - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i))); assertEquals("last tx id should become " + i, i, writer.getLastTxId()); } LogRecord record = DLMTestUtil.getLogRecordInstance(numRecords); record.setControl(); - Await.result(writer.write(record)); + Utils.ioResult(writer.write(record)); BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN); - record = Await.result(reader.readNext()); + record = Utils.ioResult(reader.readNext()); LOG.info("Read record {}", record); assertEquals(1L, record.getTransactionId()); @@ -1629,7 +1631,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { assertTrue(reader.getReadAheadReader().getNumCachedEntries() <= maxAllowedCachedRecords); for (int i = 2; i <= numRecords; i++) { - record = Await.result(reader.readNext()); + record = Utils.ioResult(reader.readNext()); LOG.info("Read record {}", record); assertEquals((long) i, record.getTransactionId()); TimeUnit.MILLISECONDS.sleep(20); @@ -1656,18 +1658,18 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { final int NUM_RECORDS = 10; int i = 1; for (; i <= NUM_RECORDS; i++) { - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i))); assertEquals("last tx id should become " + i, i, writer.getLastTxId()); } - Await.result(writer.markEndOfStream()); + Utils.ioResult(writer.markEndOfStream()); // Multiple end of streams are ok. - Await.result(writer.markEndOfStream()); + Utils.ioResult(writer.markEndOfStream()); try { - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i))); fail("Should have thrown"); } catch (EndOfStreamException ex) { } @@ -1675,12 +1677,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN); LogRecord record = null; for (int j = 0; j < NUM_RECORDS; j++) { - record = Await.result(reader.readNext()); + record = Utils.ioResult(reader.readNext()); assertEquals(j+1, record.getTransactionId()); } try { - record = Await.result(reader.readNext()); + record = Utils.ioResult(reader.readNext()); fail("Should have thrown"); } catch (EndOfStreamException ex) { } @@ -1698,9 +1700,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { DistributedLogManager dlm = createNewDLM(confLocal, name); BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); - Await.result(writer.markEndOfStream()); + Utils.ioResult(writer.markEndOfStream()); try { - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(1))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1))); fail("Should have thrown"); } catch (EndOfStreamException ex) { } @@ -1708,7 +1710,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN); try { - LogRecord record = Await.result(reader.readNext()); + LogRecord record = Utils.ioResult(reader.readNext()); fail("Should have thrown"); } catch (EndOfStreamException ex) { } @@ -1726,32 +1728,32 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { DistributedLogManager dlm = createNewDLM(confLocal, name); BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L))); LogRecord controlRecord = DLMTestUtil.getLogRecordInstance(1L); controlRecord.setControl(); - FutureUtils.result(writer.write(controlRecord)); + Utils.ioResult(writer.write(controlRecord)); BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN); - Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS); - Future<LogRecordWithDLSN> readFuture = reader.readNext(); + CompletableFuture<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + CompletableFuture<LogRecordWithDLSN> readFuture = reader.readNext(); // write another records for (int i = 0; i < 5; i++) { long txid = 2L + i; - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid))); controlRecord = DLMTestUtil.getLogRecordInstance(txid); controlRecord.setControl(); - FutureUtils.result(writer.write(controlRecord)); + Utils.ioResult(writer.write(controlRecord)); } - List<LogRecordWithDLSN> bulkReadRecords = FutureUtils.result(bulkReadFuture); + List<LogRecordWithDLSN> bulkReadRecords = Utils.ioResult(bulkReadFuture); assertEquals(2, bulkReadRecords.size()); assertEquals(1L, bulkReadRecords.get(0).getTransactionId()); assertEquals(2L, bulkReadRecords.get(1).getTransactionId()); for (LogRecordWithDLSN record : bulkReadRecords) { DLMTestUtil.verifyLogRecord(record); } - LogRecordWithDLSN record = FutureUtils.result(readFuture); + LogRecordWithDLSN record = Utils.ioResult(readFuture); assertEquals(3L, record.getTransactionId()); DLMTestUtil.verifyLogRecord(record); @@ -1771,16 +1773,16 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { DistributedLogManager dlm = createNewDLM(confLocal, name); BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L))); LogRecord controlRecord = DLMTestUtil.getLogRecordInstance(1L); controlRecord.setControl(); - FutureUtils.result(writer.write(controlRecord)); + Utils.ioResult(writer.write(controlRecord)); BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN); - Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, 0, TimeUnit.MILLISECONDS); - Future<LogRecordWithDLSN> readFuture = reader.readNext(); + CompletableFuture<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, 0, TimeUnit.MILLISECONDS); + CompletableFuture<LogRecordWithDLSN> readFuture = reader.readNext(); - List<LogRecordWithDLSN> bulkReadRecords = FutureUtils.result(bulkReadFuture); + List<LogRecordWithDLSN> bulkReadRecords = Utils.ioResult(bulkReadFuture); assertEquals(1, bulkReadRecords.size()); assertEquals(1L, bulkReadRecords.get(0).getTransactionId()); for (LogRecordWithDLSN record : bulkReadRecords) { @@ -1790,13 +1792,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { // write another records for (int i = 0; i < 5; i++) { long txid = 2L + i; - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid))); controlRecord = DLMTestUtil.getLogRecordInstance(txid); controlRecord.setControl(); - FutureUtils.result(writer.write(controlRecord)); + Utils.ioResult(writer.write(controlRecord)); } - LogRecordWithDLSN record = FutureUtils.result(readFuture); + LogRecordWithDLSN record = Utils.ioResult(readFuture); assertEquals(2L, record.getTransactionId()); DLMTestUtil.verifyLogRecord(record); @@ -1832,7 +1834,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { // 3 segments, 10 records each, immediate flush, batch size 1, so just the first // record in each ledger is discarded, for 30 - 3 = 27 records. for (int i = 0; i < 27; i++) { - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); assertFalse(record.getDlsn().getEntryId() % 10 == 0); } @@ -1868,7 +1870,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { // 3 segments, 10 records each, immediate flush, batch size 1, so just the first // record in each ledger is discarded, for 30 - 3 = 27 records. for (int i = 0; i < 30; i++) { - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); assertFalse(record.getDlsn().getEntryId() % 10 == 0); } fail("should have thrown"); @@ -1909,7 +1911,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { // 3. ranges 6-10, 7-11, 8-12, 9-13 will be bad // And so on, so 5 records in each 10 will be discarded, for 50 good records. for (int i = 0; i < 50; i++) { - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); assertFalse(record.getDlsn().getEntryId() % 10 == 0); } @@ -1946,7 +1948,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { // 2. range 1-8 will be good, but only contain 4 records // And so on for the next segment, so 4 records in each segment, for 12 good records for (int i = 0; i < 12; i++) { - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); assertFalse(record.getDlsn().getEntryId() % 10 == 0); } @@ -1970,13 +1972,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { URI uri = createDLMURI("/" + name); ensureURICreated(uri); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(confLocal).uri(uri).build(); // use the pool DistributedLogManager dlm = namespace.openLog(name + "-pool"); AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L))); List<LogSegmentMetadata> segments = dlm.getLogSegments(); assertEquals(1, segments.size()); long ledgerId = segments.get(0).getLogSegmentId(); @@ -1995,7 +1997,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { Optional.of(dynConf), Optional.<StatsLogger>absent()); writer = dlm.startAsyncLogSegmentNonPartitioned(); - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L))); segments = dlm.getLogSegments(); assertEquals(1, segments.size()); ledgerId = segments.get(0).getLogSegmentId(); @@ -2023,17 +2025,17 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { DistributedLogManager dlm = createNewDLM(confLocal, name); BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); - List<Future<DLSN>> writeFutures = Lists.newArrayList(); + List<CompletableFuture<DLSN>> writeFutures = Lists.newArrayList(); for (int i = 0; i < 5; i++) { LogRecord record = DLMTestUtil.getLogRecordInstance(1L + i); writeFutures.add(writer.write(record)); } - List<Future<DLSN>> recordSetFutures = Lists.newArrayList(); + List<CompletableFuture<DLSN>> recordSetFutures = Lists.newArrayList(); // write another 5 records final LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(4096, CompressionCodec.Type.LZ4); for (int i = 0; i < 5; i++) { LogRecord record = DLMTestUtil.getLogRecordInstance(6L + i); - Promise<DLSN> writePromise = new Promise<DLSN>(); + CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>(); recordSetWriter.writeRecord(ByteBuffer.wrap(record.getPayload()), writePromise); recordSetFutures.add(writePromise); } @@ -2042,8 +2044,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { recordSetBuffer.get(data); LogRecord setRecord = new LogRecord(6L, data); setRecord.setRecordSet(); - Future<DLSN> writeRecordSetFuture = writer.write(setRecord); - writeRecordSetFuture.addEventListener(new FutureEventListener<DLSN>() { + CompletableFuture<DLSN> writeRecordSetFuture = writer.write(setRecord); + writeRecordSetFuture.whenComplete(new FutureEventListener<DLSN>() { @Override public void onSuccess(DLSN dlsn) { recordSetWriter.completeTransmit( @@ -2058,20 +2060,20 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { } }); writeFutures.add(writeRecordSetFuture); - FutureUtils.result(writeRecordSetFuture); + Utils.ioResult(writeRecordSetFuture); // write last 5 records for (int i = 0; i < 5; i++) { LogRecord record = DLMTestUtil.getLogRecordInstance(11L + i); - Future<DLSN> writeFuture = writer.write(record); + CompletableFuture<DLSN> writeFuture = writer.write(record); writeFutures.add(writeFuture); // make sure get log record count returns the right count if (i == 0) { - FutureUtils.result(writeFuture); + Utils.ioResult(writeFuture); assertEquals(10, dlm.getLogRecordCount()); } } - List<DLSN> writeResults = FutureUtils.result(Future.collect(writeFutures)); + List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writeFutures)); for (int i = 0; i < 5; i++) { Assert.assertEquals(new DLSN(1L, i, 0L), writeResults.get(i)); @@ -2080,12 +2082,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { for (int i = 0; i < 5; i++) { Assert.assertEquals(new DLSN(1L, 6L + i, 0L), writeResults.get(6 + i)); } - List<DLSN> recordSetWriteResults = Await.result(Future.collect(recordSetFutures)); + List<DLSN> recordSetWriteResults = Utils.ioResult(FutureUtils.collect(recordSetFutures)); for (int i = 0; i < 5; i++) { Assert.assertEquals(new DLSN(1L, 5L, i), recordSetWriteResults.get(i)); } - FutureUtils.result(writer.flushAndCommit()); + Utils.ioResult(writer.flushAndCommit()); DistributedLogConfiguration readConf1 = new DistributedLogConfiguration(); readConf1.addConfiguration(confLocal); @@ -2094,7 +2096,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { DistributedLogManager readDLM1 = createNewDLM(readConf1, name); AsyncLogReader reader1 = readDLM1.getAsyncLogReader(DLSN.InitialDLSN); for (int i = 0; i < 15; i++) { - LogRecordWithDLSN record = FutureUtils.result(reader1.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader1.readNext()); if (i < 5) { assertEquals(new DLSN(1L, i, 0L), record.getDlsn()); assertEquals(1L + i, record.getTransactionId()); @@ -2118,7 +2120,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { DistributedLogManager readDLM2 = createNewDLM(readConf2, name); AsyncLogReader reader2 = readDLM2.getAsyncLogReader(DLSN.InitialDLSN); for (int i = 0; i < 11; i++) { - LogRecordWithDLSN record = FutureUtils.result(reader2.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader2.readNext()); LOG.info("Read record {}", record); if (i < 5) { assertEquals(new DLSN(1L, i, 0L), record.getDlsn()); @@ -2159,12 +2161,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { ensureURICreated(uri); DistributedLogManager dlm = createNewDLM(confLocal, name); - BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter()); + BKAsyncLogWriter writer = (BKAsyncLogWriter) Utils.ioResult(dlm.openAsyncLogWriter()); writer.write(DLMTestUtil.getLogRecordInstance(1L)); - AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN)); + AsyncLogReader reader = Utils.ioResult(dlm.openAsyncLogReader(DLSN.InitialDLSN)); try { - FutureUtils.result(reader.readNext()); + Utils.ioResult(reader.readNext()); fail("Should fail when stream is idle"); } catch (IdleReaderException ire) { // expected @@ -2191,11 +2193,11 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { ensureURICreated(uri); DistributedLogManager dlm = createNewDLM(confLocal, name); - BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter()); + BKAsyncLogWriter writer = (BKAsyncLogWriter) Utils.ioResult(dlm.openAsyncLogWriter()); writer.write(DLMTestUtil.getLogRecordInstance(1L)); - AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN)); - LogRecordWithDLSN record = FutureUtils.result(reader.readNext()); + AsyncLogReader reader = Utils.ioResult(dlm.openAsyncLogReader(DLSN.InitialDLSN)); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); assertEquals(1L, record.getTransactionId()); DLMTestUtil.verifyLogRecord(record);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java index dff0133..18e097f 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java @@ -23,10 +23,19 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; +import org.apache.distributedlog.api.LogWriter; +import org.apache.distributedlog.api.MetadataAccessor; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException; import org.apache.distributedlog.exceptions.BKTransmitException; import org.apache.distributedlog.exceptions.LogEmptyException; @@ -35,7 +44,6 @@ import org.apache.distributedlog.exceptions.LogReadException; import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore; import org.apache.distributedlog.io.Abortables; import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; -import org.apache.distributedlog.util.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Utils; import org.apache.bookkeeper.client.BKException; @@ -54,12 +62,8 @@ import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException; import org.apache.distributedlog.metadata.LogMetadata; import org.apache.distributedlog.metadata.MetadataUpdater; import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; -import org.apache.distributedlog.subscription.SubscriptionsStore; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.distributedlog.api.subscription.SubscriptionsStore; import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; @@ -89,7 +93,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { BKLogWriteHandler blplm = dlm.createWriteHandler(true); assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1, perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - FutureUtils.result(blplm.asyncClose()); + Utils.ioResult(blplm.asyncClose()); } LogWriter writer = dlm.startLogSegmentNonPartitioned(); @@ -129,7 +133,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { BKLogWriteHandler blplm = dlm.createWriteHandler(true); assertNotNull(zkc.exists(blplm.completedLedgerZNode(1, 100, perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - FutureUtils.result(blplm.asyncClose()); + Utils.ioResult(blplm.asyncClose()); } @Test(timeout = 60000) @@ -167,7 +171,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { assertNotNull( zkc.exists(blplm.completedLedgerZNode(start, txid - 1, perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - FutureUtils.result(blplm.asyncClose()); + Utils.ioResult(blplm.asyncClose()); } BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned(); @@ -263,14 +267,14 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { confLocal.setWriteLockEnabled(false); String name = "distrlog-two-writers-lock-disabled"; DistributedLogManager manager = createNewDLM(confLocal, name); - AsyncLogWriter writer1 = FutureUtils.result(manager.openAsyncLogWriter()); - FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(1L))); - AsyncLogWriter writer2 = FutureUtils.result(manager.openAsyncLogWriter()); - FutureUtils.result(writer2.write(DLMTestUtil.getLogRecordInstance(2L))); + AsyncLogWriter writer1 = Utils.ioResult(manager.openAsyncLogWriter()); + Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(1L))); + AsyncLogWriter writer2 = Utils.ioResult(manager.openAsyncLogWriter()); + Utils.ioResult(writer2.write(DLMTestUtil.getLogRecordInstance(2L))); // write a record to writer 1 again try { - FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(3L))); + Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(3L))); fail("Should fail writing record to writer 1 again as writer 2 took over the ownership"); } catch (BKTransmitException bkte) { assertEquals(BKException.Code.LedgerFencedException, bkte.getBKResultCode()); @@ -311,7 +315,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { assertNotNull( zkc.exists(blplm.completedLedgerZNode(start, txid - 1, perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - FutureUtils.result(blplm.asyncClose()); + Utils.ioResult(blplm.asyncClose()); } BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { @@ -394,7 +398,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { assertNotNull( zkc.exists(blplm.completedLedgerZNode(txid - 1, txid - 1, perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - FutureUtils.result(blplm.asyncClose()); + Utils.ioResult(blplm.asyncClose()); } BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); @@ -411,14 +415,14 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { AsyncLogReader asyncreader = dlm.getAsyncLogReader(DLSN.InvalidDLSN); long numTrans = 0; - LogRecordWithDLSN record = Await.result(asyncreader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(asyncreader.readNext()); while (null != record) { DLMTestUtil.verifyLogRecord(record); numTrans++; if (numTrans >= (txid - 1)) { break; } - record = Await.result(asyncreader.readNext()); + record = Utils.ioResult(asyncreader.readNext()); } assertEquals((txid - 1), numTrans); Utils.close(asyncreader); @@ -459,12 +463,12 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { dlm.close(); URI uri = createDLMURI("/" + name); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf).uri(uri).build(); assertTrue(namespace.logExists(name)); assertFalse(namespace.logExists("non-existent-log")); URI nonExistentUri = createDLMURI("/" + "non-existent-ns"); - DistributedLogNamespace nonExistentNS = DistributedLogNamespaceBuilder.newBuilder() + Namespace nonExistentNS = NamespaceBuilder.newBuilder() .conf(conf).uri(nonExistentUri).build(); assertFalse(nonExistentNS.logExists(name)); @@ -508,31 +512,31 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { SubscriptionsStore store = dlm.getSubscriptionsStore(); // no data - assertEquals(Await.result(store.getLastCommitPosition(subscriber0)), DLSN.NonInclusiveLowerBound); - assertEquals(Await.result(store.getLastCommitPosition(subscriber1)), DLSN.NonInclusiveLowerBound); - assertEquals(Await.result(store.getLastCommitPosition(subscriber2)), DLSN.NonInclusiveLowerBound); + assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber0)), DLSN.NonInclusiveLowerBound); + assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber1)), DLSN.NonInclusiveLowerBound); + assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber2)), DLSN.NonInclusiveLowerBound); // empty - assertTrue(Await.result(store.getLastCommitPositions()).isEmpty()); + assertTrue(Utils.ioResult(store.getLastCommitPositions()).isEmpty()); // subscriber 0 advance - Await.result(store.advanceCommitPosition(subscriber0, commitPosition0)); - assertEquals(commitPosition0, Await.result(store.getLastCommitPosition(subscriber0))); - Map<String, DLSN> committedPositions = Await.result(store.getLastCommitPositions()); + Utils.ioResult(store.advanceCommitPosition(subscriber0, commitPosition0)); + assertEquals(commitPosition0, Utils.ioResult(store.getLastCommitPosition(subscriber0))); + Map<String, DLSN> committedPositions = Utils.ioResult(store.getLastCommitPositions()); assertEquals(1, committedPositions.size()); assertEquals(commitPosition0, committedPositions.get(subscriber0)); // subscriber 1 advance - Await.result(store.advanceCommitPosition(subscriber1, commitPosition1)); - assertEquals(commitPosition1, Await.result(store.getLastCommitPosition(subscriber1))); - committedPositions = Await.result(store.getLastCommitPositions()); + Utils.ioResult(store.advanceCommitPosition(subscriber1, commitPosition1)); + assertEquals(commitPosition1, Utils.ioResult(store.getLastCommitPosition(subscriber1))); + committedPositions = Utils.ioResult(store.getLastCommitPositions()); assertEquals(2, committedPositions.size()); assertEquals(commitPosition0, committedPositions.get(subscriber0)); assertEquals(commitPosition1, committedPositions.get(subscriber1)); // subscriber 2 advance - Await.result(store.advanceCommitPosition(subscriber2, commitPosition2)); - assertEquals(commitPosition2, Await.result(store.getLastCommitPosition(subscriber2))); - committedPositions = Await.result(store.getLastCommitPositions()); + Utils.ioResult(store.advanceCommitPosition(subscriber2, commitPosition2)); + assertEquals(commitPosition2, Utils.ioResult(store.getLastCommitPosition(subscriber2))); + committedPositions = Utils.ioResult(store.getLastCommitPositions()); assertEquals(3, committedPositions.size()); assertEquals(commitPosition0, committedPositions.get(subscriber0)); assertEquals(commitPosition1, committedPositions.get(subscriber1)); @@ -541,11 +545,11 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { // subscriber 2 advance again DistributedLogManager newDLM = createNewDLM(conf, name); SubscriptionsStore newStore = newDLM.getSubscriptionsStore(); - Await.result(newStore.advanceCommitPosition(subscriber2, commitPosition3)); + Utils.ioResult(newStore.advanceCommitPosition(subscriber2, commitPosition3)); newStore.close(); newDLM.close(); - committedPositions = Await.result(store.getLastCommitPositions()); + committedPositions = Utils.ioResult(store.getLastCommitPositions()); assertEquals(3, committedPositions.size()); assertEquals(commitPosition0, committedPositions.get(subscriber0)); assertEquals(commitPosition1, committedPositions.get(subscriber1)); @@ -570,13 +574,13 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1, perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - FutureUtils.result(blplm.asyncClose()); + Utils.ioResult(blplm.asyncClose()); } else { writer.markEndOfStream(); BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, DistributedLogConstants.MAX_TXID, perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - FutureUtils.result(blplm.asyncClose()); + Utils.ioResult(blplm.asyncClose()); } } return txid; @@ -698,8 +702,8 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { @Test(timeout = 60000, expected = LogRecordTooLongException.class) public void testMaxLogRecSize() throws Exception { DistributedLogManager dlm = createNewDLM(conf, "distrlog-maxlogRecSize"); - AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); - FutureUtils.result(writer.write(new LogRecord(1L, DLMTestUtil.repeatString( + AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter()); + Utils.ioResult(writer.write(new LogRecord(1L, DLMTestUtil.repeatString( DLMTestUtil.repeatString("abcdefgh", 256), 512).getBytes()))); } @@ -710,21 +714,21 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { confLocal.setOutputBufferSize(1024 * 1024); BKDistributedLogManager dlm = createNewDLM(confLocal, "distrlog-transmissionSize"); - AsyncLogWriter out = FutureUtils.result(dlm.openAsyncLogWriter()); + AsyncLogWriter out = Utils.ioResult(dlm.openAsyncLogWriter()); boolean exceptionEncountered = false; byte[] largePayload = new byte[(LogRecord.MAX_LOGRECORDSET_SIZE / 2) + 2]; RAND.nextBytes(largePayload); try { LogRecord op = new LogRecord(1L, largePayload); - Future<DLSN> firstWriteFuture = out.write(op); + CompletableFuture<DLSN> firstWriteFuture = out.write(op); op = new LogRecord(2L, largePayload); // the second write will flush the first one, since we reached the maximum transmission size. out.write(op); - FutureUtils.result(firstWriteFuture); + Utils.ioResult(firstWriteFuture); } catch (LogRecordTooLongException exc) { exceptionEncountered = true; } finally { - FutureUtils.result(out.asyncClose()); + Utils.ioResult(out.asyncClose()); } assertFalse(exceptionEncountered); Abortables.abortQuietly(out); @@ -750,7 +754,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1, perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - FutureUtils.result(blplm.asyncClose()); + Utils.ioResult(blplm.asyncClose()); } LogReader reader = dlm.getInputStream(1); @@ -819,7 +823,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { assertNotNull( zkc.exists(blplm.completedLedgerZNode(txid - 1, txid - 1, writer.getLogSegmentSequenceNumber()), false)); - FutureUtils.result(blplm.asyncClose()); + Utils.ioResult(blplm.asyncClose()); } BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); @@ -857,7 +861,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name); - FutureUtils.result(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true)); + Utils.ioResult(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true)); dlm.registerListener(new LogSegmentListener() { @Override public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { @@ -931,7 +935,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { for (int i = 0; i < 10; i++) { LogRecord record = DLMTestUtil.getLogRecordInstance(txid++); record.setControl(); - Await.result(writer.writeControlRecord(record)); + Utils.ioResult(writer.writeControlRecord(record)); } LOG.info("10 control records are written"); @@ -946,14 +950,14 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { LOG.info("Completed first log segment"); writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); LOG.info("Completed second log segment"); LOG.info("Writing another 10 control records"); for (int i = 1; i < 10; i++) { LogRecord record = DLMTestUtil.getLogRecordInstance(txid++); record.setControl(); - Await.result(writer.write(record)); + Utils.ioResult(writer.write(record)); } assertEquals(new DLSN(2, 0, 0), dlm.getLastDLSN()); @@ -973,8 +977,8 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); DLMTestUtil.generateCompletedLogSegments(dlm, conf, 2, 10); - Future<Long> futureCount = dlm.getLogRecordCountAsync(DLSN.InitialDLSN); - Long count = Await.result(futureCount, Duration.fromSeconds(2)); + CompletableFuture<Long> futureCount = dlm.getLogRecordCountAsync(DLSN.InitialDLSN); + Long count = Utils.ioResult(futureCount, 2, TimeUnit.SECONDS); assertEquals(20, count.longValue()); writer.close(); @@ -986,7 +990,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { String baseName = testNames.getMethodName(); String streamName = "\0blah"; URI uri = createDLMURI("/" + baseName); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf).uri(uri).build(); DistributedLogManager dlm = null; @@ -1036,15 +1040,15 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); for (long j = 1; j <= 10; j++) { LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); - Future<DLSN> dlsn = writer.write(record); + CompletableFuture<DLSN> dlsn = writer.write(record); if (i == 1 && j == 2) { - truncDLSN = Await.result(dlsn); + truncDLSN = Utils.ioResult(dlsn); } else if (i == 2 && j == 3) { - beyondTruncDLSN = Await.result(dlsn); + beyondTruncDLSN = Utils.ioResult(dlsn); beyondTruncTxId = record.getTransactionId(); } else if (j == 10) { - Await.ready(dlsn); + Utils.ioResult(dlsn); } } @@ -1065,7 +1069,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater( confLocal, metadataStore); - FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(1L))); + Utils.ioResult(updater.setLogSegmentTruncated(segmentList.get(1L))); segmentList = DLMTestUtil.readLogSegments(zookeeperClient, LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); @@ -1088,7 +1092,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { } updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, metadataStore); - FutureUtils.result(updater.setLogSegmentActive(segmentList.get(1L))); + Utils.ioResult(updater.setLogSegmentActive(segmentList.get(1L))); segmentList = DLMTestUtil.readLogSegments(zookeeperClient, LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); @@ -1096,7 +1100,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { LOG.info("Read segments after marked first segment as active : {}", segmentList); updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, metadataStore); - FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(2L))); + Utils.ioResult(updater.setLogSegmentTruncated(segmentList.get(2L))); segmentList = DLMTestUtil.readLogSegments(zookeeperClient, LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); @@ -1109,7 +1113,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { boolean exceptionEncountered = false; try { for (int i = 0; i < 3 * 10; i++) { - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); DLMTestUtil.verifyLargeLogRecord(record); assertEquals(expectedTxId, record.getTransactionId()); expectedTxId++; @@ -1122,10 +1126,10 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { } updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); - FutureUtils.result(updater.setLogSegmentActive(segmentList.get(2L))); + Utils.ioResult(updater.setLogSegmentActive(segmentList.get(2L))); BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); - Assert.assertTrue(Await.result(writer.truncate(truncDLSN))); + Assert.assertTrue(Utils.ioResult(writer.truncate(truncDLSN))); BKLogWriteHandler handler = writer.getCachedWriteHandler(); List<LogSegmentMetadata> cachedSegments = handler.getCachedLogSegments(LogSegmentMetadata.COMPARATOR); for (LogSegmentMetadata segment: cachedSegments) { @@ -1164,7 +1168,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { { AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN); - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); assertTrue(record != null); assertEquals(truncDLSN, record.getDlsn()); Utils.close(reader); @@ -1190,7 +1194,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { { AsyncLogReader reader = dlm.getAsyncLogReader(beyondTruncDLSN); - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); assertTrue(record != null); assertEquals(beyondTruncDLSN, record.getDlsn()); Utils.close(reader); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java index e0f2bab..2078a88 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java @@ -28,14 +28,17 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.Sets; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; +import org.apache.distributedlog.api.LogWriter; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.callback.NamespaceListener; import org.apache.distributedlog.exceptions.AlreadyClosedException; import org.apache.distributedlog.exceptions.InvalidStreamNameException; import org.apache.distributedlog.exceptions.LockingException; import org.apache.distributedlog.exceptions.ZKException; import org.apache.distributedlog.impl.BKNamespaceDriver; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.util.DLUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -95,8 +98,8 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { DistributedLogConfiguration newConf = new DistributedLogConfiguration(); newConf.addConfiguration(conf); newConf.setCreateStreamIfNotExists(false); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() - .conf(newConf).uri(uri).build(); + Namespace namespace = NamespaceBuilder.newBuilder() + .conf(newConf).uri(uri).build(); DistributedLogManager dlm = namespace.openLog(logName); LogWriter writer; try { @@ -118,7 +121,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { newConf.addConfiguration(conf); newConf.setCreateStreamIfNotExists(false); String streamName = "test-stream"; - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(newConf).uri(uri).build(); DistributedLogManager dlm = namespace.openLog(streamName); LogWriter writer; @@ -148,7 +151,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { URI uri = createDLMURI("/" + runtime.getMethodName()); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf).uri(uri).build(); try { @@ -225,7 +228,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { public void testNamespaceListener() throws Exception { URI uri = createDLMURI("/" + runtime.getMethodName()); zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf).uri(uri).build(); final CountDownLatch[] latches = new CountDownLatch[3]; for (int i = 0; i < 3; i++) { @@ -268,7 +271,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { newConf.addConfiguration(conf); newConf.setCreateStreamIfNotExists(true); newConf.setZkAclId(un); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(newConf).uri(uri).build(); DistributedLogManager dlm = namespace.openLog(streamName); LogWriter writer = dlm.startLogSegmentNonPartitioned(); @@ -400,7 +403,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { @Test(timeout = 60000) public void testUseNamespaceAfterCloseShouldFailFast() throws Exception { URI uri = createDLMURI("/" + runtime.getMethodName()); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .build(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java index 854fd61..4915137 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java @@ -18,20 +18,21 @@ package org.apache.distributedlog; import com.google.common.base.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogWriter; import org.apache.distributedlog.exceptions.LogNotFoundException; import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; import org.apache.distributedlog.logsegment.LogSegmentFilter; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Duration; -import com.twitter.util.Future; -import com.twitter.util.Await; import java.util.List; import java.util.ArrayList; import java.util.concurrent.TimeUnit; -import com.twitter.util.TimeoutException; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -78,21 +79,21 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { DistributedLogManager dlm1 = createNewDLM(confLocal, dlName); long txid = 1; - ArrayList<Future<DLSN>> futures = new ArrayList<Future<DLSN>>(numEntriesPerSegment); + ArrayList<CompletableFuture<DLSN>> futures = new ArrayList<CompletableFuture<DLSN>>(numEntriesPerSegment); AsyncLogWriter out = dlm1.startAsyncLogSegmentNonPartitioned(); for (int eid = 0; eid < numEntriesPerSegment; ++eid) { futures.add(out.write(DLMTestUtil.getLogRecordInstance(txid))); ++txid; } - FutureUtils.result(Future.collect(futures)); + Utils.ioResult(FutureUtils.collect(futures)); // commit LogRecord controlRecord = new LogRecord(txid, DistributedLogConstants.CONTROL_RECORD_CONTENT); controlRecord.setControl(); - FutureUtils.result(out.write(controlRecord)); + Utils.ioResult(out.write(controlRecord)); DLSN last = dlm1.getLastDLSN(); assertEquals(new DLSN(1,9,0), last); - DLSN first = Await.result(dlm1.getFirstDLSNAsync()); + DLSN first = Utils.ioResult(dlm1.getFirstDLSNAsync()); assertEquals(new DLSN(1,0,0), first); Utils.close(out); } @@ -102,9 +103,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { String dlName = runtime.getMethodName(); BKDistributedLogManager dlm = createNewDLM(conf, dlName); BKLogReadHandler readHandler = dlm.createReadHandler(); - Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord(); + CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord(); try { - Await.result(futureRecord); + Utils.ioResult(futureRecord); fail("should have thrown exception"); } catch (LogNotFoundException ex) { } @@ -116,9 +117,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { BKDistributedLogManager dlm = createNewDLM(conf, dlName); DLMTestUtil.generateCompletedLogSegments(dlm, conf, 3, 3); BKLogReadHandler readHandler = dlm.createReadHandler(); - Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord(); + CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord(); try { - LogRecordWithDLSN record = Await.result(futureRecord); + LogRecordWithDLSN record = Utils.ioResult(futureRecord); assertEquals(new DLSN(1, 0, 0), record.getDlsn()); } catch (Exception ex) { fail("should not have thrown exception: " + ex); @@ -133,11 +134,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler(); AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); - Future<Boolean> futureSuccess = writer.truncate(new DLSN(2, 0, 0)); - Boolean success = Await.result(futureSuccess); + CompletableFuture<Boolean> futureSuccess = writer.truncate(new DLSN(2, 0, 0)); + Boolean success = Utils.ioResult(futureSuccess); assertTrue(success); - Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord(); - LogRecordWithDLSN record = Await.result(futureRecord); + CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord(); + LogRecordWithDLSN record = Utils.ioResult(futureRecord); assertEquals(new DLSN(2, 0, 0), record.getDlsn()); } @@ -151,11 +152,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); // Only truncates at ledger boundary. - Future<Boolean> futureSuccess = writer.truncate(new DLSN(2, 5, 0)); - Boolean success = Await.result(futureSuccess); + CompletableFuture<Boolean> futureSuccess = writer.truncate(new DLSN(2, 5, 0)); + Boolean success = Utils.ioResult(futureSuccess); assertTrue(success); - Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord(); - LogRecordWithDLSN record = Await.result(futureRecord); + CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord(); + LogRecordWithDLSN record = Utils.ioResult(futureRecord); assertEquals(new DLSN(2, 0, 0), record.getDlsn()); } @@ -164,10 +165,10 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { String dlName = runtime.getMethodName(); DistributedLogManager dlm = createNewDLM(conf, dlName); BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler(); - Future<Long> count = null; + CompletableFuture<Long> count = null; count = readHandler.asyncGetLogRecordCount(DLSN.InitialDLSN); try { - Await.result(count); + Utils.ioResult(count); fail("log is empty, should have returned log empty ex"); } catch (LogNotFoundException ex) { } @@ -179,9 +180,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { prepareLogSegmentsNonPartitioned(dlName, 11, 3); DistributedLogManager dlm = createNewDLM(conf, dlName); BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler(); - Future<Long> count = null; + CompletableFuture<Long> count = null; count = readHandler.asyncGetLogRecordCount(DLSN.InitialDLSN); - assertEquals(33, Await.result(count).longValue()); + assertEquals(33, Utils.ioResult(count).longValue()); } @Test(timeout = 60000) @@ -190,11 +191,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { prepareLogSegmentsNonPartitioned(dlName, 11, 3); DistributedLogManager dlm = createNewDLM(conf, dlName); BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler(); - Future<Long> count = null; + CompletableFuture<Long> count = null; count = readHandler.asyncGetLogRecordCount(new DLSN(2, 0, 0)); - assertEquals(30, Await.result(count).longValue()); + assertEquals(30, Utils.ioResult(count).longValue()); count = readHandler.asyncGetLogRecordCount(new DLSN(3, 0, 0)); - assertEquals(27, Await.result(count).longValue()); + assertEquals(27, Utils.ioResult(count).longValue()); } @Test(timeout = 60000) @@ -203,9 +204,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { prepareLogSegmentsNonPartitioned(dlName, 11, 3); DistributedLogManager dlm = createNewDLM(conf, dlName); BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler(); - Future<Long> count = null; + CompletableFuture<Long> count = null; count = readHandler.asyncGetLogRecordCount(new DLSN(12, 0, 0)); - assertEquals(0, Await.result(count).longValue()); + assertEquals(0, Utils.ioResult(count).longValue()); } @Test(timeout = 60000) @@ -214,9 +215,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { prepareLogSegmentsNonPartitioned(dlName, 11, 3); DistributedLogManager dlm = createNewDLM(conf, dlName); BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler(); - Future<Long> count = null; + CompletableFuture<Long> count = null; count = readHandler.asyncGetLogRecordCount(new DLSN(11, 2, 0)); - assertEquals(1, Await.result(count).longValue()); + assertEquals(1, Utils.ioResult(count).longValue()); } @Test(timeout = 60000) @@ -225,11 +226,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { prepareLogSegmentsNonPartitioned(dlName, 5, 10); DistributedLogManager dlm = createNewDLM(conf, dlName); BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler(); - Future<Long> count = null; + CompletableFuture<Long> count = null; count = readHandler.asyncGetLogRecordCount(new DLSN(3, 5, 0)); - assertEquals(25, Await.result(count).longValue()); + assertEquals(25, Utils.ioResult(count).longValue()); count = readHandler.asyncGetLogRecordCount(new DLSN(2, 5, 0)); - assertEquals(35, Await.result(count).longValue()); + assertEquals(35, Utils.ioResult(count).longValue()); } @Test(timeout = 60000) @@ -239,9 +240,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 5, 5, txid); txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 0, 10, txid); BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler(); - Future<Long> count = null; + CompletableFuture<Long> count = null; count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0)); - assertEquals(15, Await.result(count).longValue()); + assertEquals(15, Utils.ioResult(count).longValue()); } @Test(timeout = 60000) @@ -251,9 +252,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 5, 0, txid); txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 10, 0, txid); BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler(); - Future<Long> count = null; + CompletableFuture<Long> count = null; count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0)); - assertEquals(0, Await.result(count).longValue()); + assertEquals(0, Utils.ioResult(count).longValue()); } @Test(timeout = 60000) @@ -264,12 +265,12 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { AsyncLogWriter out = bkdlm.startAsyncLogSegmentNonPartitioned(); int txid = 1; - Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); - Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); - Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); + Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); + Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); + Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); BKLogReadHandler readHandler = bkdlm.createReadHandler(); - List<LogSegmentMetadata> ledgerList = FutureUtils.result( + List<LogSegmentMetadata> ledgerList = Utils.ioResult( readHandler.readLogSegmentsFromStore( LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, @@ -279,9 +280,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { assertEquals(1, ledgerList.size()); assertTrue(ledgerList.get(0).isInProgress()); - Future<Long> count = null; + CompletableFuture<Long> count = null; count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0)); - assertEquals(2, Await.result(count).longValue()); + assertEquals(2, Utils.ioResult(count).longValue()); Utils.close(out); } @@ -294,12 +295,12 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { long txid = 1; txid += DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, txid); AsyncLogWriter out = bkdlm.startAsyncLogSegmentNonPartitioned(); - Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); - Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); - Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); + Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); + Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); + Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); BKLogReadHandler readHandler = bkdlm.createReadHandler(); - List<LogSegmentMetadata> ledgerList = FutureUtils.result( + List<LogSegmentMetadata> ledgerList = Utils.ioResult( readHandler.readLogSegmentsFromStore( LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, @@ -309,9 +310,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { assertFalse(ledgerList.get(0).isInProgress()); assertTrue(ledgerList.get(1).isInProgress()); - Future<Long> count = null; + CompletableFuture<Long> count = null; count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0)); - assertEquals(7, Await.result(count).longValue()); + assertEquals(7, Utils.ioResult(count).longValue()); Utils.close(out); } @@ -322,14 +323,14 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName); BKLogReadHandler readHandler = bkdlm.createReadHandler(); try { - Await.result(readHandler.lockStream()); + Utils.ioResult(readHandler.lockStream()); fail("Should fail lock stream if log not found"); } catch (LogNotFoundException ex) { } BKLogReadHandler subscriberReadHandler = bkdlm.createReadHandler(Optional.of("test-subscriber")); try { - Await.result(subscriberReadHandler.lockStream()); + Utils.ioResult(subscriberReadHandler.lockStream()); fail("Subscriber should fail lock stream if log not found"); } catch (LogNotFoundException ex) { // expected @@ -342,17 +343,17 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { BKDistributedLogManager bkdlm = createNewDLM(conf, streamName); DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1); BKLogReadHandler readHandler = bkdlm.createReadHandler(); - Await.result(readHandler.lockStream()); + Utils.ioResult(readHandler.lockStream()); // two subscribers could lock stream in parallel BKDistributedLogManager bkdlm10 = createNewDLM(conf, streamName); BKLogReadHandler s10Handler = bkdlm10.createReadHandler(Optional.of("s1")); - Await.result(s10Handler.lockStream()); + Utils.ioResult(s10Handler.lockStream()); BKDistributedLogManager bkdlm20 = createNewDLM(conf, streamName); BKLogReadHandler s20Handler = bkdlm20.createReadHandler(Optional.of("s2")); - Await.result(s20Handler.lockStream()); + Utils.ioResult(s20Handler.lockStream()); readHandler.asyncClose(); bkdlm.close(); @@ -368,19 +369,19 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { BKDistributedLogManager bkdlm = createNewDLM(conf, streamName); DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1); BKLogReadHandler readHandler = bkdlm.createReadHandler(); - Await.result(readHandler.lockStream()); + Utils.ioResult(readHandler.lockStream()); // same subscrbiers couldn't lock stream in parallel BKDistributedLogManager bkdlm10 = createNewDLM(conf, streamName); BKLogReadHandler s10Handler = bkdlm10.createReadHandler(Optional.of("s1")); - Await.result(s10Handler.lockStream()); + Utils.ioResult(s10Handler.lockStream()); BKDistributedLogManager bkdlm11 = createNewDLM(conf, streamName); BKLogReadHandler s11Handler = bkdlm11.createReadHandler(Optional.of("s1")); try { - Await.result(s11Handler.lockStream(), Duration.apply(10000, TimeUnit.MILLISECONDS)); + Utils.ioResult(s11Handler.lockStream(), 10000, TimeUnit.MILLISECONDS); fail("Should fail lock stream using same subscriber id"); } catch (OwnershipAcquireFailedException oafe) { // expected