http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java index 3919e92..a7d0d25 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java @@ -18,6 +18,27 @@ package org.apache.distributedlog; import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.bookkeeper.stats.AlertStatsLogger; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.commons.lang3.tuple.Pair; import org.apache.distributedlog.callback.LogSegmentNamesListener; import org.apache.distributedlog.exceptions.LogEmptyException; import org.apache.distributedlog.exceptions.LogSegmentNotFoundException; @@ -31,35 +52,12 @@ import org.apache.distributedlog.logsegment.PerStreamLogSegmentCache; import org.apache.distributedlog.logsegment.LogSegmentFilter; import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; import org.apache.distributedlog.metadata.LogStreamMetadataStore; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - /** * The base class about log handler on managing log segments. * @@ -171,25 +169,27 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { return lockClientId; } - public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() { - final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); - streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) - .addEventListener(new FutureEventListener<Void>() { + public CompletableFuture<LogRecordWithDLSN> asyncGetFirstLogRecord() { + final CompletableFuture<LogRecordWithDLSN> promise = new CompletableFuture<LogRecordWithDLSN>(); + streamMetadataStore.logExists( + logMetadata.getUri(), + logMetadata.getLogName() + ).whenComplete(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { readLogSegmentsFromStore( LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, null - ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + ).whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { @Override public void onSuccess(Versioned<List<LogSegmentMetadata>> ledgerList) { if (ledgerList.getValue().isEmpty()) { - promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); + promise.completeExceptionally(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); return; } - Future<LogRecordWithDLSN> firstRecord = null; + CompletableFuture<LogRecordWithDLSN> firstRecord = null; for (LogSegmentMetadata ledger : ledgerList.getValue()) { if (!ledger.isTruncated() && (ledger.getRecordCount() > 0 || ledger.isInProgress())) { firstRecord = asyncReadFirstUserRecord(ledger, DLSN.InitialDLSN); @@ -197,43 +197,45 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { } } if (null != firstRecord) { - promise.become(firstRecord); + FutureUtils.proxyTo(firstRecord, promise); } else { - promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); + promise.completeExceptionally(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); } } @Override public void onFailure(Throwable cause) { - promise.setException(cause); + promise.completeExceptionally(cause); } }); } @Override public void onFailure(Throwable cause) { - promise.setException(cause); + promise.completeExceptionally(cause); } }); return promise; } - public Future<LogRecordWithDLSN> getLastLogRecordAsync(final boolean recover, final boolean includeEndOfStream) { - final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); - streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) - .addEventListener(new FutureEventListener<Void>() { + public CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsync(final boolean recover, final boolean includeEndOfStream) { + final CompletableFuture<LogRecordWithDLSN> promise = new CompletableFuture<LogRecordWithDLSN>(); + streamMetadataStore.logExists( + logMetadata.getUri(), + logMetadata.getLogName() + ).whenComplete(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { readLogSegmentsFromStore( LogSegmentMetadata.DESC_COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, null - ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + ).whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { @Override public void onSuccess(Versioned<List<LogSegmentMetadata>> ledgerList) { if (ledgerList.getValue().isEmpty()) { - promise.setException( + promise.completeExceptionally( new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); return; } @@ -247,49 +249,51 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { @Override public void onFailure(Throwable cause) { - promise.setException(cause); + promise.completeExceptionally(cause); } }); } @Override public void onFailure(Throwable cause) { - promise.setException(cause); + promise.completeExceptionally(cause); } }); return promise; } private void asyncGetLastLogRecord(final Iterator<LogSegmentMetadata> ledgerIter, - final Promise<LogRecordWithDLSN> promise, + final CompletableFuture<LogRecordWithDLSN> promise, final boolean fence, final boolean includeControlRecord, final boolean includeEndOfStream) { if (ledgerIter.hasNext()) { LogSegmentMetadata metadata = ledgerIter.next(); - asyncReadLastRecord(metadata, fence, includeControlRecord, includeEndOfStream).addEventListener( - new FutureEventListener<LogRecordWithDLSN>() { - @Override - public void onSuccess(LogRecordWithDLSN record) { - if (null == record) { - asyncGetLastLogRecord(ledgerIter, promise, fence, includeControlRecord, includeEndOfStream); - } else { - promise.setValue(record); - } - } - - @Override - public void onFailure(Throwable cause) { - promise.setException(cause); + asyncReadLastRecord( + metadata, fence, includeControlRecord, includeEndOfStream + ).whenComplete( + new FutureEventListener<LogRecordWithDLSN>() { + @Override + public void onSuccess(LogRecordWithDLSN record) { + if (null == record) { + asyncGetLastLogRecord(ledgerIter, promise, fence, includeControlRecord, includeEndOfStream); + } else { + promise.complete(record); } } + + @Override + public void onFailure(Throwable cause) { + promise.completeExceptionally(cause); + } + } ); } else { - promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); + promise.completeExceptionally(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); } } - private Future<LogRecordWithDLSN> asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) { + private CompletableFuture<LogRecordWithDLSN> asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) { return ReadUtils.asyncReadFirstUserRecord( getFullyQualifiedName(), ledger, @@ -307,15 +311,17 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { * beginDLSN and the second denoted by endPosition. Its up to the caller to ensure that endPosition refers to * position in the same ledger as beginDLSN. */ - private Future<Long> asyncGetLogRecordCount(LogSegmentMetadata ledger, final DLSN beginDLSN, final long endPosition) { - return asyncReadFirstUserRecord(ledger, beginDLSN).map(new Function<LogRecordWithDLSN, Long>() { - public Long apply(final LogRecordWithDLSN beginRecord) { - long recordCount = 0; - if (null != beginRecord) { - recordCount = endPosition + 1 - beginRecord.getLastPositionWithinLogSegment(); - } - return recordCount; + private CompletableFuture<Long> asyncGetLogRecordCount(LogSegmentMetadata ledger, + final DLSN beginDLSN, + final long endPosition) { + return asyncReadFirstUserRecord( + ledger, beginDLSN + ).thenApply(beginRecord -> { + long recordCount = 0; + if (null != beginRecord) { + recordCount = endPosition + 1 - beginRecord.getLastPositionWithinLogSegment(); } + return recordCount; }); } @@ -325,31 +331,29 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { * an interior entry. For the last entry, if it is inprogress, we need to recover it and find the last user * entry. */ - private Future<Long> asyncGetLogRecordCount(final LogSegmentMetadata ledger, final DLSN beginDLSN) { + private CompletableFuture<Long> asyncGetLogRecordCount(final LogSegmentMetadata ledger, final DLSN beginDLSN) { if (ledger.isInProgress() && ledger.isDLSNinThisSegment(beginDLSN)) { - return asyncReadLastUserRecord(ledger).flatMap(new Function<LogRecordWithDLSN, Future<Long>>() { - public Future<Long> apply(final LogRecordWithDLSN endRecord) { + return asyncReadLastUserRecord(ledger).thenCompose( + (Function<LogRecordWithDLSN, CompletableFuture<Long>>) endRecord -> { if (null != endRecord) { - return asyncGetLogRecordCount(ledger, beginDLSN, endRecord.getLastPositionWithinLogSegment() /* end position */); + return asyncGetLogRecordCount( + ledger, beginDLSN, endRecord.getLastPositionWithinLogSegment() /* end position */); } else { - return Future.value((long) 0); + return FutureUtils.value((long) 0); } - } - }); + }); } else if (ledger.isInProgress()) { - return asyncReadLastUserRecord(ledger).map(new Function<LogRecordWithDLSN, Long>() { - public Long apply(final LogRecordWithDLSN endRecord) { - if (null != endRecord) { - return (long) endRecord.getLastPositionWithinLogSegment(); - } else { - return (long) 0; - } + return asyncReadLastUserRecord(ledger).thenApply(endRecord -> { + if (null != endRecord) { + return (long) endRecord.getLastPositionWithinLogSegment(); + } else { + return (long) 0; } }); } else if (ledger.isDLSNinThisSegment(beginDLSN)) { return asyncGetLogRecordCount(ledger, beginDLSN, ledger.getRecordCount() /* end position */); } else { - return Future.value((long) ledger.getRecordCount()); + return FutureUtils.value((long) ledger.getRecordCount()); } } @@ -359,29 +363,26 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { * @param beginDLSN dlsn marking the start of the range * @return the count of records present in the range */ - public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) { + public CompletableFuture<Long> asyncGetLogRecordCount(final DLSN beginDLSN) { return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) - .flatMap(new Function<Void, Future<Long>>() { - public Future<Long> apply(Void done) { + .thenCompose(new Function<Void, CompletableFuture<Long>>() { + public CompletableFuture<Long> apply(Void done) { return readLogSegmentsFromStore( LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, null - ).flatMap(new Function<Versioned<List<LogSegmentMetadata>>, Future<Long>>() { - public Future<Long> apply(Versioned<List<LogSegmentMetadata>> ledgerList) { + ).thenCompose(new Function<Versioned<List<LogSegmentMetadata>>, CompletableFuture<Long>>() { + public CompletableFuture<Long> apply(Versioned<List<LogSegmentMetadata>> ledgerList) { - List<Future<Long>> futureCounts = new ArrayList<Future<Long>>(ledgerList.getValue().size()); + List<CompletableFuture<Long>> futureCounts = + Lists.newArrayListWithExpectedSize(ledgerList.getValue().size()); for (LogSegmentMetadata ledger : ledgerList.getValue()) { if (ledger.getLogSegmentSequenceNumber() >= beginDLSN.getLogSegmentSequenceNo()) { futureCounts.add(asyncGetLogRecordCount(ledger, beginDLSN)); } } - return Future.collect(futureCounts).map(new Function<List<Long>, Long>() { - public Long apply(List<Long> counts) { - return sum(counts); - } - }); + return FutureUtils.collect(futureCounts).thenApply(counts -> sum(counts)); } }); } @@ -397,15 +398,15 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { } @Override - public Future<Void> asyncAbort() { + public CompletableFuture<Void> asyncAbort() { return asyncClose(); } - public Future<LogRecordWithDLSN> asyncReadLastUserRecord(final LogSegmentMetadata l) { + public CompletableFuture<LogRecordWithDLSN> asyncReadLastUserRecord(final LogSegmentMetadata l) { return asyncReadLastRecord(l, false, false, false); } - public Future<LogRecordWithDLSN> asyncReadLastRecord(final LogSegmentMetadata l, + public CompletableFuture<LogRecordWithDLSN> asyncReadLastRecord(final LogSegmentMetadata l, final boolean fence, final boolean includeControl, final boolean includeEndOfStream) { @@ -422,7 +423,7 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { numRecordsScanned, scheduler, entryStore - ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() { + ).whenComplete(new FutureEventListener<LogRecordWithDLSN>() { @Override public void onSuccess(LogRecordWithDLSN value) { recoverLastEntryStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); @@ -572,17 +573,17 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { * @param logSegmentNamesListener * @return future represents the result of log segments */ - public Future<Versioned<List<LogSegmentMetadata>>> readLogSegmentsFromStore( + public CompletableFuture<Versioned<List<LogSegmentMetadata>>> readLogSegmentsFromStore( final Comparator<LogSegmentMetadata> comparator, final LogSegmentFilter segmentFilter, final LogSegmentNamesListener logSegmentNamesListener) { - final Promise<Versioned<List<LogSegmentMetadata>>> readResult = - new Promise<Versioned<List<LogSegmentMetadata>>>(); + final CompletableFuture<Versioned<List<LogSegmentMetadata>>> readResult = + new CompletableFuture<Versioned<List<LogSegmentMetadata>>>(); metadataStore.getLogSegmentNames(logMetadata.getLogSegmentsPath(), logSegmentNamesListener) - .addEventListener(new FutureEventListener<Versioned<List<String>>>() { + .whenComplete(new FutureEventListener<Versioned<List<String>>>() { @Override public void onFailure(Throwable cause) { - FutureUtils.setException(readResult, cause); + readResult.completeExceptionally(cause); } @Override @@ -596,7 +597,7 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { protected void readLogSegmentsFromStore(final Versioned<List<String>> logSegmentNames, final Comparator<LogSegmentMetadata> comparator, final LogSegmentFilter segmentFilter, - final Promise<Versioned<List<LogSegmentMetadata>>> readResult) { + final CompletableFuture<Versioned<List<LogSegmentMetadata>>> readResult) { Set<String> segmentsReceived = new HashSet<String>(); segmentsReceived.addAll(segmentFilter.filter(logSegmentNames.getValue())); Set<String> segmentsAdded; @@ -619,12 +620,11 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { try { segmentList = getCachedLogSegments(comparator); } catch (UnexpectedException e) { - FutureUtils.setException(readResult, e); + readResult.completeExceptionally(e); return; } - FutureUtils.setValue(readResult, - new Versioned<List<LogSegmentMetadata>>(segmentList, logSegmentNames.getVersion())); + readResult.complete(new Versioned<List<LogSegmentMetadata>>(segmentList, logSegmentNames.getVersion())); return; } @@ -646,7 +646,7 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { continue; } metadataStore.getLogSegment(logSegmentPath) - .addEventListener(new FutureEventListener<LogSegmentMetadata>() { + .whenComplete(new FutureEventListener<LogSegmentMetadata>() { @Override public void onSuccess(LogSegmentMetadata result) { @@ -666,7 +666,7 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { } else { // fail fast if (1 == numFailures.incrementAndGet()) { - FutureUtils.setException(readResult, cause); + readResult.completeExceptionally(cause); return; } } @@ -689,7 +689,7 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { private void completeReadLogSegmentsFromStore(final Set<String> removedSegments, final Map<String, LogSegmentMetadata> addedSegments, final Comparator<LogSegmentMetadata> comparator, - final Promise<Versioned<List<LogSegmentMetadata>>> readResult, + final CompletableFuture<Versioned<List<LogSegmentMetadata>>> readResult, final Version logSegmentNamesVersion, final AtomicInteger numChildren, final AtomicInteger numFailures) { @@ -705,11 +705,10 @@ abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { try { segmentList = getCachedLogSegments(comparator); } catch (UnexpectedException e) { - FutureUtils.setException(readResult, e); + readResult.completeExceptionally(e); return; } - FutureUtils.setValue(readResult, - new Versioned<List<LogSegmentMetadata>>(segmentList, logSegmentNamesVersion)); + readResult.complete(new Versioned<List<LogSegmentMetadata>>(segmentList, logSegmentNamesVersion)); } }
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java index c6e2e07..ff6b527 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java @@ -17,15 +17,22 @@ */ package org.apache.distributedlog; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; +import javax.annotation.Nullable; +import org.apache.bookkeeper.stats.AlertStatsLogger; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; import org.apache.distributedlog.callback.LogSegmentListener; import org.apache.distributedlog.callback.LogSegmentNamesListener; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; @@ -40,29 +47,12 @@ import org.apache.distributedlog.lock.DistributedLock; import org.apache.distributedlog.logsegment.LogSegmentFilter; import org.apache.distributedlog.logsegment.LogSegmentMetadataCache; import org.apache.distributedlog.metadata.LogStreamMetadataStore; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Utils; -import com.twitter.util.ExceptionalFunction; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; -import com.twitter.util.Try; -import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.SafeRunnable; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import javax.annotation.Nullable; /** * Log Handler for Readers. @@ -112,7 +102,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { private final Optional<String> subscriberId; private DistributedLock readLock; - private Future<Void> lockAcquireFuture; + private CompletableFuture<Void> lockAcquireFuture; // notify the state change about the read handler protected final AsyncNotification readerStateNotification; @@ -166,31 +156,23 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { return logMetadataForReader.getReadLockPath(subscriberId); } - <T> void satisfyPromiseAsync(final Promise<T> promise, final Try<T> result) { - scheduler.submit(new SafeRunnable() { - @Override - public void safeRun() { - promise.update(result); - } - }); - } - - Future<Void> checkLogStreamExists() { + CompletableFuture<Void> checkLogStreamExists() { return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()); } /** * Elective stream lock--readers are not required to acquire the lock before using the stream. */ - synchronized Future<Void> lockStream() { + synchronized CompletableFuture<Void> lockStream() { if (null == lockAcquireFuture) { lockAcquireFuture = streamMetadataStore.createReadLock(logMetadataForReader, subscriberId) - .flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() { - @Override - public Future<Void> applyE(DistributedLock lock) throws Throwable { + .thenCompose(lock -> { + try { BKLogReadHandler.this.readLock = lock; LOG.info("acquiring readlock {} at {}", getLockClientId(), getReadLockPath()); return acquireLockOnExecutorThread(lock); + } catch (LockingException le) { + return FutureUtils.exception(le); } }); } @@ -201,33 +183,31 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { * Begin asynchronous lock acquire, but ensure that the returned future is satisfied on an * executor service thread. */ - Future<Void> acquireLockOnExecutorThread(DistributedLock lock) throws LockingException { - final Future<? extends DistributedLock> acquireFuture = lock.asyncAcquire(); + CompletableFuture<Void> acquireLockOnExecutorThread(DistributedLock lock) throws LockingException { + final CompletableFuture<? extends DistributedLock> acquireFuture = lock.asyncAcquire(); // The future we return must be satisfied on an executor service thread. If we simply // return the future returned by asyncAcquire, user callbacks may end up running in // the lock state executor thread, which will cause deadlocks and introduce latency // etc. - final Promise<Void> threadAcquirePromise = new Promise<Void>(); - threadAcquirePromise.setInterruptHandler(new Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - FutureUtils.cancel(acquireFuture); - return null; + final CompletableFuture<Void> threadAcquirePromise = new CompletableFuture<Void>(); + threadAcquirePromise.whenComplete((value, cause) -> { + if (cause instanceof CancellationException) { + acquireFuture.cancel(true); } }); - acquireFuture.addEventListener(new FutureEventListener<DistributedLock>() { + acquireFuture.whenCompleteAsync(new FutureEventListener<DistributedLock>() { @Override public void onSuccess(DistributedLock lock) { LOG.info("acquired readlock {} at {}", getLockClientId(), getReadLockPath()); - satisfyPromiseAsync(threadAcquirePromise, new Return<Void>(null)); + threadAcquirePromise.complete(null); } @Override public void onFailure(Throwable cause) { LOG.info("failed to acquire readlock {} at {}", new Object[]{ getLockClientId(), getReadLockPath(), cause }); - satisfyPromiseAsync(threadAcquirePromise, new Throw<Void>(cause)); + threadAcquirePromise.completeExceptionally(cause); } }); return threadAcquirePromise; @@ -239,7 +219,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { void checkReadLock() throws DLIllegalStateException, LockingException { synchronized (this) { if ((null == lockAcquireFuture) || - (!lockAcquireFuture.isDefined())) { + (!lockAcquireFuture.isDone())) { throw new DLIllegalStateException("Attempt to check for lock before it has been acquired successfully"); } } @@ -247,27 +227,24 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { readLock.checkOwnership(); } - public Future<Void> asyncClose() { + public CompletableFuture<Void> asyncClose() { DistributedLock lockToClose; synchronized (this) { - if (null != lockAcquireFuture && !lockAcquireFuture.isDefined()) { - FutureUtils.cancel(lockAcquireFuture); + if (null != lockAcquireFuture && !lockAcquireFuture.isDone()) { + lockAcquireFuture.cancel(true); } lockToClose = readLock; } return Utils.closeSequence(scheduler, lockToClose) - .flatMap(new AbstractFunction1<Void, Future<Void>>() { - @Override - public Future<Void> apply(Void result) { + .thenApply((value) -> { // unregister the log segment listener metadataStore.unregisterLogSegmentListener(logMetadata.getLogSegmentsPath(), BKLogReadHandler.this); - return Future.Void(); - } - }); + return null; + }); } @Override - public Future<Void> asyncAbort() { + public CompletableFuture<Void> asyncAbort() { return asyncClose(); } @@ -277,18 +254,18 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { * * @return future represents the fetch result */ - Future<Versioned<List<LogSegmentMetadata>>> asyncStartFetchLogSegments() { - Promise<Versioned<List<LogSegmentMetadata>>> promise = - new Promise<Versioned<List<LogSegmentMetadata>>>(); + CompletableFuture<Versioned<List<LogSegmentMetadata>>> asyncStartFetchLogSegments() { + CompletableFuture<Versioned<List<LogSegmentMetadata>>> promise = + new CompletableFuture<Versioned<List<LogSegmentMetadata>>>(); asyncStartFetchLogSegments(promise); return promise; } - void asyncStartFetchLogSegments(final Promise<Versioned<List<LogSegmentMetadata>>> promise) { + void asyncStartFetchLogSegments(final CompletableFuture<Versioned<List<LogSegmentMetadata>>> promise) { readLogSegmentsFromStore( LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, - this).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + this).whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { @Override public void onFailure(Throwable cause) { if (cause instanceof LogNotFoundException || @@ -298,7 +275,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { metadataException.compareAndSet(null, (IOException) cause); // notify the reader that read handler is in error state notifyReaderOnError(cause); - FutureUtils.setException(promise, cause); + FutureUtils.completeExceptionally(promise, cause); return; } scheduler.schedule(new Runnable() { @@ -312,7 +289,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { @Override public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { // no-op - FutureUtils.setValue(promise, segments); + FutureUtils.complete(promise, segments); } }); } @@ -332,9 +309,9 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { } } - Promise<Versioned<List<LogSegmentMetadata>>> readLogSegmentsPromise = - new Promise<Versioned<List<LogSegmentMetadata>>>(); - readLogSegmentsPromise.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + CompletableFuture<Versioned<List<LogSegmentMetadata>>> readLogSegmentsPromise = + new CompletableFuture<Versioned<List<LogSegmentMetadata>>>(); + readLogSegmentsPromise.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { @Override public void onFailure(Throwable cause) { if (cause instanceof LogNotFoundException || http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java index 6b60c77..a4016c8 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java @@ -20,6 +20,8 @@ package org.apache.distributedlog; import java.io.IOException; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -28,6 +30,7 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import java.util.function.Function; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.exceptions.BKTransmitException; import org.apache.distributedlog.exceptions.EndOfStreamException; @@ -47,20 +50,15 @@ import org.apache.distributedlog.io.CompressionUtils; import org.apache.distributedlog.lock.DistributedLock; import org.apache.distributedlog.logsegment.LogSegmentEntryWriter; import org.apache.distributedlog.logsegment.LogSegmentWriter; -import org.apache.distributedlog.stats.BroadCastStatsLogger; -import org.apache.distributedlog.stats.OpStatsListener; +import org.apache.distributedlog.common.stats.BroadCastStatsLogger; +import org.apache.distributedlog.common.stats.OpStatsListener; import org.apache.distributedlog.util.FailpointUtils; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.PermitLimiter; -import org.apache.distributedlog.util.SafeQueueingFuturePool; +import org.apache.distributedlog.common.util.PermitLimiter; import org.apache.distributedlog.util.SimplePermitLimiter; -import org.apache.distributedlog.util.Sizable; -import com.twitter.util.Function0; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.FuturePool; -import com.twitter.util.Promise; +import org.apache.distributedlog.common.util.Sizable; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.BKException; @@ -73,10 +71,9 @@ import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.MathUtils; +import org.apache.distributedlog.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; import static com.google.common.base.Charsets.UTF_8; import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; @@ -146,7 +143,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null); final private AtomicReference<Exception> scheduledFlushException = new AtomicReference<Exception>(null); private boolean enforceLock = true; - private Promise<Void> closeFuture = null; + private CompletableFuture<Void> closeFuture = null; private final boolean enableRecordCounts; private int positionWithinLogSegment = 0; private final long logSegmentSequenceNumber; @@ -170,28 +167,17 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private final OpStatsLogger addCompleteDeferredTime; private final Counter pendingWrites; - // add complete processing - private final SafeQueueingFuturePool<Void> addCompleteFuturePool; - // Functions - private final AbstractFunction1<Integer, Future<Long>> GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC = - new AbstractFunction1<Integer, Future<Long>>() { - @Override - public Future<Long> apply(Integer transmitRc) { - if (BKException.Code.OK == transmitRc) { - return Future.value(getLastTxIdAcknowledged()); - } else { - return Future.exception(new BKTransmitException("Failed to transmit entry", transmitRc)); - } - } - }; - final AbstractFunction1<Long, Future<Long>> COMMIT_AFTER_FLUSH_FUNC = - new AbstractFunction1<Long, Future<Long>>() { - @Override - public Future<Long> apply(Long lastAckedTxId) { - return commit(); - } - }; + private final Function<Integer, CompletableFuture<Long>> GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC = + transmitRc -> { + if (BKException.Code.OK == transmitRc) { + return FutureUtils.value(getLastTxIdAcknowledged()); + } else { + return FutureUtils.exception(new BKTransmitException("Failed to transmit entry", transmitRc)); + } + }; + final Function<Long, CompletableFuture<Long>> COMMIT_AFTER_FLUSH_FUNC = + lastAckedTxId -> commit(); private final AlertStatsLogger alertStatsLogger; private final WriteLimiter writeLimiter; @@ -341,11 +327,6 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } this.conf = conf; - if (null != scheduler) { - this.addCompleteFuturePool = new SafeQueueingFuturePool<Void>(scheduler.getFuturePool(streamName)); - } else { - this.addCompleteFuturePool = null; - } assert(!this.immediateFlushEnabled || (null != this.scheduler)); this.lastTransmit = Stopwatch.createStarted(); } @@ -360,11 +341,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } @VisibleForTesting - FuturePool getFuturePool() { - if (null == scheduler) { - return null; - } - return scheduler.getFuturePool(streamName); + ScheduledExecutorService getFuturePool() { + return scheduler.chooseExecutor(streamName); } @VisibleForTesting @@ -471,21 +449,15 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } @Override - public Future<Void> asyncClose() { + public CompletableFuture<Void> asyncClose() { return closeInternal(false); } @Override - public Future<Void> asyncAbort() { + public CompletableFuture<Void> asyncAbort() { return closeInternal(true); } - private void flushAddCompletes() { - if (null != addCompleteFuturePool) { - addCompleteFuturePool.close(); - } - } - private synchronized void abortPacket(BKTransmitPacket packet) { long numRecords = 0; if (null != packet) { @@ -495,7 +467,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz if (BKException.Code.OK == rc) { rc = BKException.Code.InterruptedException; } - Throwable reason = new WriteCancelledException(streamName, FutureUtils.transmitException(rc)); + Throwable reason = new WriteCancelledException(streamName, Utils.transmitException(rc)); recordSet.abortTransmit(reason); } LOG.info("Stream {} aborted {} writes", fullyQualifiedLogSegment, numRecords); @@ -509,21 +481,13 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } } - private synchronized long getPendingAddCompleteCount() { - if (null != addCompleteFuturePool) { - return addCompleteFuturePool.size(); - } else { - return 0; - } - } - - private Future<Void> closeInternal(boolean abort) { - Promise<Void> closePromise; + private CompletableFuture<Void> closeInternal(boolean abort) { + CompletableFuture<Void> closePromise; synchronized (this) { if (null != closeFuture) { return closeFuture; } - closePromise = closeFuture = new Promise<Void>(); + closePromise = closeFuture = new CompletableFuture<Void>(); } AtomicReference<Throwable> throwExc = new AtomicReference<Throwable>(null); @@ -533,7 +497,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private void closeInternal(final boolean abort, final AtomicReference<Throwable> throwExc, - final Promise<Void> closePromise) { + final CompletableFuture<Void> closePromise) { // clean stats resources this.transmitOutstandingLogger.unregisterGauge("requests", transmitOutstandingGauge); this.writeLimiter.close(); @@ -560,7 +524,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz if (!abort && !isLogSegmentInError()) { this.enforceLock = false; LOG.info("Flushing before closing log segment {}", getFullyQualifiedLogSegment()); - flushAndCommit().addEventListener(new FutureEventListener<Long>() { + flushAndCommit().whenComplete(new FutureEventListener<Long>() { @Override public void onSuccess(Long value) { abortTransmitPacketOnClose(abort, throwExc, closePromise); @@ -580,11 +544,11 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private void abortTransmitPacketOnClose(final boolean abort, final AtomicReference<Throwable> throwExc, - final Promise<Void> closePromise) { + final CompletableFuture<Void> closePromise) { LOG.info("Closing BKPerStreamLogWriter (abort={}) for {} :" + - " lastDLSN = {} outstandingTransmits = {} writesPendingTransmit = {} addCompletesPending = {}", + " lastDLSN = {} outstandingTransmits = {} writesPendingTransmit = {}", new Object[]{abort, fullyQualifiedLogSegment, getLastDLSN(), - outstandingTransmits.get(), getWritesPendingTransmit(), getPendingAddCompleteCount()}); + outstandingTransmits.get(), getWritesPendingTransmit()}); // Save the current packet to reset, leave a new empty packet to avoid a race with // addCompleteDeferredProcessing. @@ -602,7 +566,6 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz packetPreviousSaved.addTransmitCompleteListener(new FutureEventListener<Integer>() { @Override public void onSuccess(Integer transmitResult) { - flushAddCompletes(); abortPacket(packetCurrentSaved); } @Override @@ -620,7 +583,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private void closeLedgerOnClose(final boolean abort, final AtomicReference<Throwable> throwExc, - final Promise<Void> closePromise) { + final CompletableFuture<Void> closePromise) { // close the log segment if it isn't in error state, so all the outstanding addEntry(s) will callback. if (null == throwExc.get() && !isLogSegmentInError()) { // Synchronous closing the ledger handle, if we couldn't close a ledger handle successfully. @@ -644,16 +607,16 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private void completeClosePromise(final boolean abort, final AtomicReference<Throwable> throwExc, - final Promise<Void> closePromise) { + final CompletableFuture<Void> closePromise) { // If add entry failed because of closing ledger above, we don't need to fail the close operation if (!abort && null == throwExc.get() && shouldFailCompleteLogSegment()) { throwExc.set(new BKTransmitException("Closing an errored stream : ", transmitResult.get())); } if (null == throwExc.get()) { - FutureUtils.setValue(closePromise, null); + FutureUtils.complete(closePromise, null); } else { - FutureUtils.setException(closePromise, throwExc.get()); + FutureUtils.completeExceptionally(closePromise, throwExc.get()); } } @@ -664,12 +627,12 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } @Override - synchronized public Future<DLSN> asyncWrite(LogRecord record) { + synchronized public CompletableFuture<DLSN> asyncWrite(LogRecord record) { return asyncWrite(record, true); } - synchronized public Future<DLSN> asyncWrite(LogRecord record, boolean flush) { - Future<DLSN> result = null; + synchronized public CompletableFuture<DLSN> asyncWrite(LogRecord record, boolean flush) { + CompletableFuture<DLSN> result = null; try { if (record.isControl()) { // we don't pack control records with user records together @@ -677,7 +640,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz try { transmit(); } catch (IOException ioe) { - return Future.exception(new WriteCancelledException(fullyQualifiedLogSegment, ioe)); + return FutureUtils.exception(new WriteCancelledException(fullyQualifiedLogSegment, ioe)); } result = writeControlLogRecord(record); transmit(); @@ -685,7 +648,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz result = writeUserRecord(record); if (!isDurableWriteEnabled) { // we have no idea about the DLSN if durability is turned off. - result = Future.value(DLSN.InvalidDLSN); + result = FutureUtils.value(DLSN.InvalidDLSN); } if (flush) { flushIfNeeded(); @@ -697,7 +660,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz if (null != result) { LOG.error("Overriding first result with flush failure {}", result); } - result = Future.exception(ioe); + result = FutureUtils.exception(ioe); // Flush to ensure any prev. writes with flush=false are flushed despite failure. flushIfNeededNoThrow(); @@ -705,7 +668,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz return result; } - synchronized private Future<DLSN> writeUserRecord(LogRecord record) throws IOException { + synchronized private CompletableFuture<DLSN> writeUserRecord(LogRecord record) throws IOException { if (null != closeFuture) { throw new WriteException(fullyQualifiedLogSegment, BKException.getMessage(BKException.Code.LedgerClosedException)); } @@ -737,7 +700,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz // Internally generated log records don't increment the count // writeInternal will always set a count regardless of whether it was // incremented or not. - Future<DLSN> future = null; + CompletableFuture<DLSN> future = null; try { // increment the position for the record to write // if the record is failed to write, it would be decremented. @@ -759,12 +722,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } // Track outstanding requests and return the future. - return future.ensure(new Function0<BoxedUnit>() { - public BoxedUnit apply() { - pendingWrites.dec(); - writeLimiter.release(); - return null; - } + return FutureUtils.ensure(future, () -> { + pendingWrites.dec(); + writeLimiter.release(); }); } @@ -777,7 +737,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz (transmitResult.get() != BKException.Code.LedgerClosedException); } - synchronized public Future<DLSN> writeInternal(LogRecord record) + synchronized public CompletableFuture<DLSN> writeInternal(LogRecord record) throws LogRecordTooLongException, LockingException, BKTransmitException, WriteException, InvalidEnvelopedEntryException { int logRecordSize = record.getPersistentSize(); @@ -802,8 +762,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz record.setPositionWithinLogSegment(positionWithinLogSegment); } - Promise<DLSN> writePromise = new Promise<DLSN>(); - writePromise.addEventListener(new OpStatsListener<DLSN>(writeTime)); + CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>(); + writePromise.whenComplete(new OpStatsListener<DLSN>(writeTime)); recordSetWriter.writeRecord(record, writePromise); if (record.getTransactionId() < lastTxId) { @@ -818,7 +778,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz return writePromise; } - synchronized private Future<DLSN> writeControlLogRecord() + synchronized private CompletableFuture<DLSN> writeControlLogRecord() throws BKTransmitException, WriteException, InvalidEnvelopedEntryException, LockingException, LogRecordTooLongException { LogRecord controlRec = new LogRecord(lastTxId, DistributedLogConstants.CONTROL_RECORD_CONTENT); @@ -826,7 +786,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz return writeControlLogRecord(controlRec); } - synchronized private Future<DLSN> writeControlLogRecord(LogRecord record) + synchronized private CompletableFuture<DLSN> writeControlLogRecord(LogRecord record) throws BKTransmitException, WriteException, InvalidEnvelopedEntryException, LockingException, LogRecordTooLongException { return writeInternal(record); @@ -851,12 +811,12 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz * as read-only in the metadata. No appends to the * stream will be allowed after this point */ - public Future<Long> markEndOfStream() { + public CompletableFuture<Long> markEndOfStream() { synchronized (this) { try { writeEndOfStreamMarker(); } catch (IOException e) { - return Future.exception(e); + return FutureUtils.exception(e); } streamEnded = true; } @@ -905,60 +865,60 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } @Override - public synchronized Future<Long> flush() { + public synchronized CompletableFuture<Long> flush() { try { checkStateBeforeTransmit(); } catch (WriteException e) { - return Future.exception(e); + return FutureUtils.exception(e); } - Future<Integer> transmitFuture; + CompletableFuture<Integer> transmitFuture; try { transmitFuture = transmit(); } catch (BKTransmitException e) { - return Future.exception(e); + return FutureUtils.exception(e); } catch (LockingException e) { - return Future.exception(e); + return FutureUtils.exception(e); } catch (WriteException e) { - return Future.exception(e); + return FutureUtils.exception(e); } catch (InvalidEnvelopedEntryException e) { - return Future.exception(e); + return FutureUtils.exception(e); } if (null == transmitFuture) { if (null != packetPrevious) { transmitFuture = packetPrevious.getTransmitFuture(); } else { - return Future.value(getLastTxIdAcknowledged()); + return FutureUtils.value(getLastTxIdAcknowledged()); } } - return transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC); + return transmitFuture.thenCompose(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC); } @Override - public synchronized Future<Long> commit() { + public synchronized CompletableFuture<Long> commit() { // we don't pack control records with user records together // so transmit current output buffer if possible - Future<Integer> transmitFuture; + CompletableFuture<Integer> transmitFuture; try { try { transmitFuture = transmit(); } catch (IOException ioe) { - return Future.exception(ioe); + return FutureUtils.exception(ioe); } if (null == transmitFuture) { writeControlLogRecord(); return flush(); } } catch (IOException ioe) { - return Future.exception(ioe); + return FutureUtils.exception(ioe); } - return transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC); + return transmitFuture.thenCompose(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC); } - Future<Long> flushAndCommit() { - return flush().flatMap(COMMIT_AFTER_FLUSH_FUNC); + CompletableFuture<Long> flushAndCommit() { + return flush().thenCompose(COMMIT_AFTER_FLUSH_FUNC); } void flushIfNeededNoThrow() { @@ -1054,7 +1014,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz * @throws WriteException if failed to create the envelope for the data to transmit * @throws InvalidEnvelopedEntryException when built an invalid enveloped entry */ - private Future<Integer> transmit() + private CompletableFuture<Integer> transmit() throws BKTransmitException, LockingException, WriteException, InvalidEnvelopedEntryException { EntryBuffer recordSetToTransmit; transmitLock.lock(); @@ -1183,10 +1143,11 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } } - if (null != addCompleteFuturePool) { + if (null != scheduler) { final Stopwatch queuedTime = Stopwatch.createStarted(); - addCompleteFuturePool.apply(new Function0<Void>() { - public Void apply() { + scheduler.submit(streamName, new Callable<Void>() { + @Override + public Void call() { final Stopwatch deferredTime = Stopwatch.createStarted(); addCompleteQueuedTime.registerSuccessfulEvent(queuedTime.elapsed(TimeUnit.MICROSECONDS)); addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC.get()); @@ -1198,7 +1159,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz return String.format("AddComplete(Stream=%s, entryId=%d, rc=%d)", fullyQualifiedLogSegment, entryId, rc); } - }).addEventListener(new FutureEventListener<Void>() { + }).whenComplete(new FutureEventListener<Void>() { @Override public void onSuccess(Void done) { } @@ -1278,7 +1239,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz if (BKException.Code.OK == transmitResult.get()) { recordSet.completeTransmit(logSegmentSequenceNumber, entryId); } else { - recordSet.abortTransmit(FutureUtils.transmitException(transmitResult.get())); + recordSet.abortTransmit(Utils.transmitException(transmitResult.get())); } if (cancelPendingPromises) { @@ -1292,7 +1253,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } packetCurrentSaved.getRecordSet().abortTransmit( new WriteCancelledException(streamName, - FutureUtils.transmitException(transmitResult.get()))); + Utils.transmitException(transmitResult.get()))); } }