http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
index 3919e92..a7d0d25 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
@@ -18,6 +18,27 @@
 package org.apache.distributedlog;
 
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.distributedlog.callback.LogSegmentNamesListener;
 import org.apache.distributedlog.exceptions.LogEmptyException;
 import org.apache.distributedlog.exceptions.LogSegmentNotFoundException;
@@ -31,35 +52,12 @@ import 
org.apache.distributedlog.logsegment.PerStreamLogSegmentCache;
 import org.apache.distributedlog.logsegment.LogSegmentFilter;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * The base class about log handler on managing log segments.
  *
@@ -171,25 +169,27 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
         return lockClientId;
     }
 
-    public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() {
-        final Promise<LogRecordWithDLSN> promise = new 
Promise<LogRecordWithDLSN>();
-        streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName())
-                .addEventListener(new FutureEventListener<Void>() {
+    public CompletableFuture<LogRecordWithDLSN> asyncGetFirstLogRecord() {
+        final CompletableFuture<LogRecordWithDLSN> promise = new 
CompletableFuture<LogRecordWithDLSN>();
+        streamMetadataStore.logExists(
+            logMetadata.getUri(),
+            logMetadata.getLogName()
+        ).whenComplete(new FutureEventListener<Void>() {
             @Override
             public void onSuccess(Void value) {
                 readLogSegmentsFromStore(
                         LogSegmentMetadata.COMPARATOR,
                         LogSegmentFilter.DEFAULT_FILTER,
                         null
-                ).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+                ).whenComplete(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
 
                     @Override
                     public void onSuccess(Versioned<List<LogSegmentMetadata>> 
ledgerList) {
                         if (ledgerList.getValue().isEmpty()) {
-                            promise.setException(new LogEmptyException("Log " 
+ getFullyQualifiedName() + " has no records"));
+                            promise.completeExceptionally(new 
LogEmptyException("Log " + getFullyQualifiedName() + " has no records"));
                             return;
                         }
-                        Future<LogRecordWithDLSN> firstRecord = null;
+                        CompletableFuture<LogRecordWithDLSN> firstRecord = 
null;
                         for (LogSegmentMetadata ledger : 
ledgerList.getValue()) {
                             if (!ledger.isTruncated() && 
(ledger.getRecordCount() > 0 || ledger.isInProgress())) {
                                 firstRecord = asyncReadFirstUserRecord(ledger, 
DLSN.InitialDLSN);
@@ -197,43 +197,45 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
                             }
                         }
                         if (null != firstRecord) {
-                            promise.become(firstRecord);
+                            FutureUtils.proxyTo(firstRecord, promise);
                         } else {
-                            promise.setException(new LogEmptyException("Log " 
+ getFullyQualifiedName() + " has no records"));
+                            promise.completeExceptionally(new 
LogEmptyException("Log " + getFullyQualifiedName() + " has no records"));
                         }
                     }
 
                     @Override
                     public void onFailure(Throwable cause) {
-                        promise.setException(cause);
+                        promise.completeExceptionally(cause);
                     }
                 });
             }
 
             @Override
             public void onFailure(Throwable cause) {
-                promise.setException(cause);
+                promise.completeExceptionally(cause);
             }
         });
         return promise;
     }
 
-    public Future<LogRecordWithDLSN> getLastLogRecordAsync(final boolean 
recover, final boolean includeEndOfStream) {
-        final Promise<LogRecordWithDLSN> promise = new 
Promise<LogRecordWithDLSN>();
-        streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName())
-                .addEventListener(new FutureEventListener<Void>() {
+    public CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsync(final 
boolean recover, final boolean includeEndOfStream) {
+        final CompletableFuture<LogRecordWithDLSN> promise = new 
CompletableFuture<LogRecordWithDLSN>();
+        streamMetadataStore.logExists(
+            logMetadata.getUri(),
+            logMetadata.getLogName()
+        ).whenComplete(new FutureEventListener<Void>() {
             @Override
             public void onSuccess(Void value) {
                 readLogSegmentsFromStore(
                         LogSegmentMetadata.DESC_COMPARATOR,
                         LogSegmentFilter.DEFAULT_FILTER,
                         null
-                ).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+                ).whenComplete(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
 
                     @Override
                     public void onSuccess(Versioned<List<LogSegmentMetadata>> 
ledgerList) {
                         if (ledgerList.getValue().isEmpty()) {
-                            promise.setException(
+                            promise.completeExceptionally(
                                     new LogEmptyException("Log " + 
getFullyQualifiedName() + " has no records"));
                             return;
                         }
@@ -247,49 +249,51 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
 
                     @Override
                     public void onFailure(Throwable cause) {
-                        promise.setException(cause);
+                        promise.completeExceptionally(cause);
                     }
                 });
             }
 
             @Override
             public void onFailure(Throwable cause) {
-                promise.setException(cause);
+                promise.completeExceptionally(cause);
             }
         });
         return promise;
     }
 
     private void asyncGetLastLogRecord(final Iterator<LogSegmentMetadata> 
ledgerIter,
-                                       final Promise<LogRecordWithDLSN> 
promise,
+                                       final 
CompletableFuture<LogRecordWithDLSN> promise,
                                        final boolean fence,
                                        final boolean includeControlRecord,
                                        final boolean includeEndOfStream) {
         if (ledgerIter.hasNext()) {
             LogSegmentMetadata metadata = ledgerIter.next();
-            asyncReadLastRecord(metadata, fence, includeControlRecord, 
includeEndOfStream).addEventListener(
-                    new FutureEventListener<LogRecordWithDLSN>() {
-                        @Override
-                        public void onSuccess(LogRecordWithDLSN record) {
-                            if (null == record) {
-                                asyncGetLastLogRecord(ledgerIter, promise, 
fence, includeControlRecord, includeEndOfStream);
-                            } else {
-                                promise.setValue(record);
-                            }
-                        }
-
-                        @Override
-                        public void onFailure(Throwable cause) {
-                            promise.setException(cause);
+            asyncReadLastRecord(
+                metadata, fence, includeControlRecord, includeEndOfStream
+            ).whenComplete(
+                new FutureEventListener<LogRecordWithDLSN>() {
+                    @Override
+                    public void onSuccess(LogRecordWithDLSN record) {
+                        if (null == record) {
+                            asyncGetLastLogRecord(ledgerIter, promise, fence, 
includeControlRecord, includeEndOfStream);
+                        } else {
+                            promise.complete(record);
                         }
                     }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                                                         
promise.completeExceptionally(cause);
+                                                                               
               }
+                }
             );
         } else {
-            promise.setException(new LogEmptyException("Log " + 
getFullyQualifiedName() + " has no records"));
+            promise.completeExceptionally(new LogEmptyException("Log " + 
getFullyQualifiedName() + " has no records"));
         }
     }
 
-    private Future<LogRecordWithDLSN> 
asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) {
+    private CompletableFuture<LogRecordWithDLSN> 
asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) {
         return ReadUtils.asyncReadFirstUserRecord(
                 getFullyQualifiedName(),
                 ledger,
@@ -307,15 +311,17 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
      * beginDLSN and the second denoted by endPosition. Its up to the caller 
to ensure that endPosition refers to
      * position in the same ledger as beginDLSN.
      */
-    private Future<Long> asyncGetLogRecordCount(LogSegmentMetadata ledger, 
final DLSN beginDLSN, final long endPosition) {
-        return asyncReadFirstUserRecord(ledger, beginDLSN).map(new 
Function<LogRecordWithDLSN, Long>() {
-            public Long apply(final LogRecordWithDLSN beginRecord) {
-                long recordCount = 0;
-                if (null != beginRecord) {
-                    recordCount = endPosition + 1 - 
beginRecord.getLastPositionWithinLogSegment();
-                }
-                return recordCount;
+    private CompletableFuture<Long> asyncGetLogRecordCount(LogSegmentMetadata 
ledger,
+                                                           final DLSN 
beginDLSN,
+                                                           final long 
endPosition) {
+        return asyncReadFirstUserRecord(
+            ledger, beginDLSN
+        ).thenApply(beginRecord -> {
+            long recordCount = 0;
+            if (null != beginRecord) {
+                recordCount = endPosition + 1 - 
beginRecord.getLastPositionWithinLogSegment();
             }
+            return recordCount;
         });
     }
 
@@ -325,31 +331,29 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
      * an interior entry. For the last entry, if it is inprogress, we need to 
recover it and find the last user
      * entry.
      */
-    private Future<Long> asyncGetLogRecordCount(final LogSegmentMetadata 
ledger, final DLSN beginDLSN) {
+    private CompletableFuture<Long> asyncGetLogRecordCount(final 
LogSegmentMetadata ledger, final DLSN beginDLSN) {
         if (ledger.isInProgress() && ledger.isDLSNinThisSegment(beginDLSN)) {
-            return asyncReadLastUserRecord(ledger).flatMap(new 
Function<LogRecordWithDLSN, Future<Long>>() {
-                public Future<Long> apply(final LogRecordWithDLSN endRecord) {
+            return asyncReadLastUserRecord(ledger).thenCompose(
+                (Function<LogRecordWithDLSN, CompletableFuture<Long>>) 
endRecord -> {
                     if (null != endRecord) {
-                        return asyncGetLogRecordCount(ledger, beginDLSN, 
endRecord.getLastPositionWithinLogSegment() /* end position */);
+                        return asyncGetLogRecordCount(
+                            ledger, beginDLSN, 
endRecord.getLastPositionWithinLogSegment() /* end position */);
                     } else {
-                        return Future.value((long) 0);
+                        return FutureUtils.value((long) 0);
                     }
-                }
-            });
+                });
         } else if (ledger.isInProgress()) {
-            return asyncReadLastUserRecord(ledger).map(new 
Function<LogRecordWithDLSN, Long>() {
-                public Long apply(final LogRecordWithDLSN endRecord) {
-                    if (null != endRecord) {
-                        return (long) 
endRecord.getLastPositionWithinLogSegment();
-                    } else {
-                        return (long) 0;
-                    }
+            return asyncReadLastUserRecord(ledger).thenApply(endRecord -> {
+                if (null != endRecord) {
+                    return (long) endRecord.getLastPositionWithinLogSegment();
+                } else {
+                    return (long) 0;
                 }
             });
         } else if (ledger.isDLSNinThisSegment(beginDLSN)) {
             return asyncGetLogRecordCount(ledger, beginDLSN, 
ledger.getRecordCount() /* end position */);
         } else {
-            return Future.value((long) ledger.getRecordCount());
+            return FutureUtils.value((long) ledger.getRecordCount());
         }
     }
 
@@ -359,29 +363,26 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
      * @param beginDLSN dlsn marking the start of the range
      * @return the count of records present in the range
      */
-    public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) {
+    public CompletableFuture<Long> asyncGetLogRecordCount(final DLSN 
beginDLSN) {
         return streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName())
-                .flatMap(new Function<Void, Future<Long>>() {
-            public Future<Long> apply(Void done) {
+                .thenCompose(new Function<Void, CompletableFuture<Long>>() {
+            public CompletableFuture<Long> apply(Void done) {
 
                 return readLogSegmentsFromStore(
                         LogSegmentMetadata.COMPARATOR,
                         LogSegmentFilter.DEFAULT_FILTER,
                         null
-                ).flatMap(new Function<Versioned<List<LogSegmentMetadata>>, 
Future<Long>>() {
-                    public Future<Long> 
apply(Versioned<List<LogSegmentMetadata>> ledgerList) {
+                ).thenCompose(new 
Function<Versioned<List<LogSegmentMetadata>>, CompletableFuture<Long>>() {
+                    public CompletableFuture<Long> 
apply(Versioned<List<LogSegmentMetadata>> ledgerList) {
 
-                        List<Future<Long>> futureCounts = new 
ArrayList<Future<Long>>(ledgerList.getValue().size());
+                        List<CompletableFuture<Long>> futureCounts =
+                          
Lists.newArrayListWithExpectedSize(ledgerList.getValue().size());
                         for (LogSegmentMetadata ledger : 
ledgerList.getValue()) {
                             if (ledger.getLogSegmentSequenceNumber() >= 
beginDLSN.getLogSegmentSequenceNo()) {
                                 
futureCounts.add(asyncGetLogRecordCount(ledger, beginDLSN));
                             }
                         }
-                        return Future.collect(futureCounts).map(new 
Function<List<Long>, Long>() {
-                            public Long apply(List<Long> counts) {
-                                return sum(counts);
-                            }
-                        });
+                        return 
FutureUtils.collect(futureCounts).thenApply(counts -> sum(counts));
                     }
                 });
             }
@@ -397,15 +398,15 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
     }
 
     @Override
-    public Future<Void> asyncAbort() {
+    public CompletableFuture<Void> asyncAbort() {
         return asyncClose();
     }
 
-    public Future<LogRecordWithDLSN> asyncReadLastUserRecord(final 
LogSegmentMetadata l) {
+    public CompletableFuture<LogRecordWithDLSN> asyncReadLastUserRecord(final 
LogSegmentMetadata l) {
         return asyncReadLastRecord(l, false, false, false);
     }
 
-    public Future<LogRecordWithDLSN> asyncReadLastRecord(final 
LogSegmentMetadata l,
+    public CompletableFuture<LogRecordWithDLSN> asyncReadLastRecord(final 
LogSegmentMetadata l,
                                                          final boolean fence,
                                                          final boolean 
includeControl,
                                                          final boolean 
includeEndOfStream) {
@@ -422,7 +423,7 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
                 numRecordsScanned,
                 scheduler,
                 entryStore
-        ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
+        ).whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
             @Override
             public void onSuccess(LogRecordWithDLSN value) {
                 
recoverLastEntryStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
@@ -572,17 +573,17 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
      * @param logSegmentNamesListener
      * @return future represents the result of log segments
      */
-    public Future<Versioned<List<LogSegmentMetadata>>> 
readLogSegmentsFromStore(
+    public CompletableFuture<Versioned<List<LogSegmentMetadata>>> 
readLogSegmentsFromStore(
             final Comparator<LogSegmentMetadata> comparator,
             final LogSegmentFilter segmentFilter,
             final LogSegmentNamesListener logSegmentNamesListener) {
-        final Promise<Versioned<List<LogSegmentMetadata>>> readResult =
-                new Promise<Versioned<List<LogSegmentMetadata>>>();
+        final CompletableFuture<Versioned<List<LogSegmentMetadata>>> 
readResult =
+                new CompletableFuture<Versioned<List<LogSegmentMetadata>>>();
         metadataStore.getLogSegmentNames(logMetadata.getLogSegmentsPath(), 
logSegmentNamesListener)
-                .addEventListener(new 
FutureEventListener<Versioned<List<String>>>() {
+                .whenComplete(new 
FutureEventListener<Versioned<List<String>>>() {
                     @Override
                     public void onFailure(Throwable cause) {
-                        FutureUtils.setException(readResult, cause);
+                        readResult.completeExceptionally(cause);
                     }
 
                     @Override
@@ -596,7 +597,7 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
     protected void readLogSegmentsFromStore(final Versioned<List<String>> 
logSegmentNames,
                                             final 
Comparator<LogSegmentMetadata> comparator,
                                             final LogSegmentFilter 
segmentFilter,
-                                            final 
Promise<Versioned<List<LogSegmentMetadata>>> readResult) {
+                                            final 
CompletableFuture<Versioned<List<LogSegmentMetadata>>> readResult) {
         Set<String> segmentsReceived = new HashSet<String>();
         
segmentsReceived.addAll(segmentFilter.filter(logSegmentNames.getValue()));
         Set<String> segmentsAdded;
@@ -619,12 +620,11 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
             try {
                 segmentList = getCachedLogSegments(comparator);
             } catch (UnexpectedException e) {
-                FutureUtils.setException(readResult, e);
+                readResult.completeExceptionally(e);
                 return;
             }
 
-            FutureUtils.setValue(readResult,
-                    new Versioned<List<LogSegmentMetadata>>(segmentList, 
logSegmentNames.getVersion()));
+            readResult.complete(new 
Versioned<List<LogSegmentMetadata>>(segmentList, logSegmentNames.getVersion()));
             return;
         }
 
@@ -646,7 +646,7 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
                 continue;
             }
             metadataStore.getLogSegment(logSegmentPath)
-                    .addEventListener(new 
FutureEventListener<LogSegmentMetadata>() {
+                    .whenComplete(new 
FutureEventListener<LogSegmentMetadata>() {
 
                         @Override
                         public void onSuccess(LogSegmentMetadata result) {
@@ -666,7 +666,7 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
                             } else {
                                 // fail fast
                                 if (1 == numFailures.incrementAndGet()) {
-                                    FutureUtils.setException(readResult, 
cause);
+                                    readResult.completeExceptionally(cause);
                                     return;
                                 }
                             }
@@ -689,7 +689,7 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
     private void completeReadLogSegmentsFromStore(final Set<String> 
removedSegments,
                                                   final Map<String, 
LogSegmentMetadata> addedSegments,
                                                   final 
Comparator<LogSegmentMetadata> comparator,
-                                                  final 
Promise<Versioned<List<LogSegmentMetadata>>> readResult,
+                                                  final 
CompletableFuture<Versioned<List<LogSegmentMetadata>>> readResult,
                                                   final Version 
logSegmentNamesVersion,
                                                   final AtomicInteger 
numChildren,
                                                   final AtomicInteger 
numFailures) {
@@ -705,11 +705,10 @@ abstract class BKLogHandler implements AsyncCloseable, 
AsyncAbortable {
         try {
             segmentList = getCachedLogSegments(comparator);
         } catch (UnexpectedException e) {
-            FutureUtils.setException(readResult, e);
+            readResult.completeExceptionally(e);
             return;
         }
-        FutureUtils.setValue(readResult,
-            new Versioned<List<LogSegmentMetadata>>(segmentList, 
logSegmentNamesVersion));
+        readResult.complete(new 
Versioned<List<LogSegmentMetadata>>(segmentList, logSegmentNamesVersion));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
index c6e2e07..ff6b527 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
@@ -17,15 +17,22 @@
  */
 package org.apache.distributedlog;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
+import javax.annotation.Nullable;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.callback.LogSegmentListener;
 import org.apache.distributedlog.callback.LogSegmentNamesListener;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
@@ -40,29 +47,12 @@ import org.apache.distributedlog.lock.DistributedLock;
 import org.apache.distributedlog.logsegment.LogSegmentFilter;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import com.twitter.util.Try;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.SafeRunnable;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import javax.annotation.Nullable;
 
 /**
  * Log Handler for Readers.
@@ -112,7 +102,7 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
 
     private final Optional<String> subscriberId;
     private DistributedLock readLock;
-    private Future<Void> lockAcquireFuture;
+    private CompletableFuture<Void> lockAcquireFuture;
 
     // notify the state change about the read handler
     protected final AsyncNotification readerStateNotification;
@@ -166,31 +156,23 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
         return logMetadataForReader.getReadLockPath(subscriberId);
     }
 
-    <T> void satisfyPromiseAsync(final Promise<T> promise, final Try<T> 
result) {
-        scheduler.submit(new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                promise.update(result);
-            }
-        });
-    }
-
-    Future<Void> checkLogStreamExists() {
+    CompletableFuture<Void> checkLogStreamExists() {
         return streamMetadataStore.logExists(logMetadata.getUri(), 
logMetadata.getLogName());
     }
 
     /**
      * Elective stream lock--readers are not required to acquire the lock 
before using the stream.
      */
-    synchronized Future<Void> lockStream() {
+    synchronized CompletableFuture<Void> lockStream() {
         if (null == lockAcquireFuture) {
             lockAcquireFuture = 
streamMetadataStore.createReadLock(logMetadataForReader, subscriberId)
-                    .flatMap(new ExceptionalFunction<DistributedLock, 
Future<Void>>() {
-                        @Override
-                        public Future<Void> applyE(DistributedLock lock) 
throws Throwable {
+                    .thenCompose(lock -> {
+                        try {
                             BKLogReadHandler.this.readLock = lock;
                             LOG.info("acquiring readlock {} at {}", 
getLockClientId(), getReadLockPath());
                             return acquireLockOnExecutorThread(lock);
+                        } catch (LockingException le) {
+                            return FutureUtils.exception(le);
                         }
                     });
         }
@@ -201,33 +183,31 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
      * Begin asynchronous lock acquire, but ensure that the returned future is 
satisfied on an
      * executor service thread.
      */
-    Future<Void> acquireLockOnExecutorThread(DistributedLock lock) throws 
LockingException {
-        final Future<? extends DistributedLock> acquireFuture = 
lock.asyncAcquire();
+    CompletableFuture<Void> acquireLockOnExecutorThread(DistributedLock lock) 
throws LockingException {
+        final CompletableFuture<? extends DistributedLock> acquireFuture = 
lock.asyncAcquire();
 
         // The future we return must be satisfied on an executor service 
thread. If we simply
         // return the future returned by asyncAcquire, user callbacks may end 
up running in
         // the lock state executor thread, which will cause deadlocks and 
introduce latency
         // etc.
-        final Promise<Void> threadAcquirePromise = new Promise<Void>();
-        threadAcquirePromise.setInterruptHandler(new Function<Throwable, 
BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable t) {
-                FutureUtils.cancel(acquireFuture);
-                return null;
+        final CompletableFuture<Void> threadAcquirePromise = new 
CompletableFuture<Void>();
+        threadAcquirePromise.whenComplete((value, cause) -> {
+            if (cause instanceof CancellationException) {
+                acquireFuture.cancel(true);
             }
         });
-        acquireFuture.addEventListener(new 
FutureEventListener<DistributedLock>() {
+        acquireFuture.whenCompleteAsync(new 
FutureEventListener<DistributedLock>() {
             @Override
             public void onSuccess(DistributedLock lock) {
                 LOG.info("acquired readlock {} at {}", getLockClientId(), 
getReadLockPath());
-                satisfyPromiseAsync(threadAcquirePromise, new 
Return<Void>(null));
+                threadAcquirePromise.complete(null);
             }
 
             @Override
             public void onFailure(Throwable cause) {
                 LOG.info("failed to acquire readlock {} at {}",
                         new Object[]{ getLockClientId(), getReadLockPath(), 
cause });
-                satisfyPromiseAsync(threadAcquirePromise, new 
Throw<Void>(cause));
+                threadAcquirePromise.completeExceptionally(cause);
             }
         });
         return threadAcquirePromise;
@@ -239,7 +219,7 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
     void checkReadLock() throws DLIllegalStateException, LockingException {
         synchronized (this) {
             if ((null == lockAcquireFuture) ||
-                (!lockAcquireFuture.isDefined())) {
+                (!lockAcquireFuture.isDone())) {
                 throw new DLIllegalStateException("Attempt to check for lock 
before it has been acquired successfully");
             }
         }
@@ -247,27 +227,24 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
         readLock.checkOwnership();
     }
 
-    public Future<Void> asyncClose() {
+    public CompletableFuture<Void> asyncClose() {
         DistributedLock lockToClose;
         synchronized (this) {
-            if (null != lockAcquireFuture && !lockAcquireFuture.isDefined()) {
-                FutureUtils.cancel(lockAcquireFuture);
+            if (null != lockAcquireFuture && !lockAcquireFuture.isDone()) {
+                lockAcquireFuture.cancel(true);
             }
             lockToClose = readLock;
         }
         return Utils.closeSequence(scheduler, lockToClose)
-                .flatMap(new AbstractFunction1<Void, Future<Void>>() {
-            @Override
-            public Future<Void> apply(Void result) {
+            .thenApply((value) -> {
                 // unregister the log segment listener
                 
metadataStore.unregisterLogSegmentListener(logMetadata.getLogSegmentsPath(), 
BKLogReadHandler.this);
-                return Future.Void();
-            }
-        });
+                return null;
+            });
     }
 
     @Override
-    public Future<Void> asyncAbort() {
+    public CompletableFuture<Void> asyncAbort() {
         return asyncClose();
     }
 
@@ -277,18 +254,18 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
      *
      * @return future represents the fetch result
      */
-    Future<Versioned<List<LogSegmentMetadata>>> asyncStartFetchLogSegments() {
-        Promise<Versioned<List<LogSegmentMetadata>>> promise =
-                new Promise<Versioned<List<LogSegmentMetadata>>>();
+    CompletableFuture<Versioned<List<LogSegmentMetadata>>> 
asyncStartFetchLogSegments() {
+        CompletableFuture<Versioned<List<LogSegmentMetadata>>> promise =
+                new CompletableFuture<Versioned<List<LogSegmentMetadata>>>();
         asyncStartFetchLogSegments(promise);
         return promise;
     }
 
-    void asyncStartFetchLogSegments(final 
Promise<Versioned<List<LogSegmentMetadata>>> promise) {
+    void asyncStartFetchLogSegments(final 
CompletableFuture<Versioned<List<LogSegmentMetadata>>> promise) {
         readLogSegmentsFromStore(
                 LogSegmentMetadata.COMPARATOR,
                 LogSegmentFilter.DEFAULT_FILTER,
-                this).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+                this).whenComplete(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
             @Override
             public void onFailure(Throwable cause) {
                 if (cause instanceof LogNotFoundException ||
@@ -298,7 +275,7 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
                     metadataException.compareAndSet(null, (IOException) cause);
                     // notify the reader that read handler is in error state
                     notifyReaderOnError(cause);
-                    FutureUtils.setException(promise, cause);
+                    FutureUtils.completeExceptionally(promise, cause);
                     return;
                 }
                 scheduler.schedule(new Runnable() {
@@ -312,7 +289,7 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
             @Override
             public void onSuccess(Versioned<List<LogSegmentMetadata>> 
segments) {
                 // no-op
-                FutureUtils.setValue(promise, segments);
+                FutureUtils.complete(promise, segments);
             }
         });
     }
@@ -332,9 +309,9 @@ class BKLogReadHandler extends BKLogHandler implements 
LogSegmentNamesListener {
             }
         }
 
-        Promise<Versioned<List<LogSegmentMetadata>>> readLogSegmentsPromise =
-                new Promise<Versioned<List<LogSegmentMetadata>>>();
-        readLogSegmentsPromise.addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+        CompletableFuture<Versioned<List<LogSegmentMetadata>>> 
readLogSegmentsPromise =
+                new CompletableFuture<Versioned<List<LogSegmentMetadata>>>();
+        readLogSegmentsPromise.whenComplete(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
             @Override
             public void onFailure(Throwable cause) {
                 if (cause instanceof LogNotFoundException ||

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index 6b60c77..a4016c8 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -20,6 +20,8 @@ package org.apache.distributedlog;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -28,6 +30,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import java.util.function.Function;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.BKTransmitException;
 import org.apache.distributedlog.exceptions.EndOfStreamException;
@@ -47,20 +50,15 @@ import org.apache.distributedlog.io.CompressionUtils;
 import org.apache.distributedlog.lock.DistributedLock;
 import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
 import org.apache.distributedlog.logsegment.LogSegmentWriter;
-import org.apache.distributedlog.stats.BroadCastStatsLogger;
-import org.apache.distributedlog.stats.OpStatsListener;
+import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.common.stats.OpStatsListener;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.PermitLimiter;
-import org.apache.distributedlog.util.SafeQueueingFuturePool;
+import org.apache.distributedlog.common.util.PermitLimiter;
 import org.apache.distributedlog.util.SimplePermitLimiter;
-import org.apache.distributedlog.util.Sizable;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.FuturePool;
-import com.twitter.util.Promise;
+import org.apache.distributedlog.common.util.Sizable;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.BKException;
@@ -73,10 +71,9 @@ import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
 
 import static com.google.common.base.Charsets.UTF_8;
 import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
@@ -146,7 +143,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = 
new AtomicReference<ScheduledFuture<?>>(null);
     final private AtomicReference<Exception> scheduledFlushException = new 
AtomicReference<Exception>(null);
     private boolean enforceLock = true;
-    private Promise<Void> closeFuture = null;
+    private CompletableFuture<Void> closeFuture = null;
     private final boolean enableRecordCounts;
     private int positionWithinLogSegment = 0;
     private final long logSegmentSequenceNumber;
@@ -170,28 +167,17 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     private final OpStatsLogger addCompleteDeferredTime;
     private final Counter pendingWrites;
 
-    // add complete processing
-    private final SafeQueueingFuturePool<Void> addCompleteFuturePool;
-
     // Functions
-    private final AbstractFunction1<Integer, Future<Long>> 
GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC =
-            new AbstractFunction1<Integer, Future<Long>>() {
-                @Override
-                public Future<Long> apply(Integer transmitRc) {
-                    if (BKException.Code.OK == transmitRc) {
-                        return Future.value(getLastTxIdAcknowledged());
-                    } else {
-                        return Future.exception(new 
BKTransmitException("Failed to transmit entry", transmitRc));
-                    }
-                }
-            };
-    final AbstractFunction1<Long, Future<Long>> COMMIT_AFTER_FLUSH_FUNC =
-            new AbstractFunction1<Long, Future<Long>>() {
-                @Override
-                public Future<Long> apply(Long lastAckedTxId) {
-                    return commit();
-                }
-            };
+    private final Function<Integer, CompletableFuture<Long>> 
GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC =
+        transmitRc -> {
+            if (BKException.Code.OK == transmitRc) {
+                return FutureUtils.value(getLastTxIdAcknowledged());
+            } else {
+                return FutureUtils.exception(new BKTransmitException("Failed 
to transmit entry", transmitRc));
+            }
+        };
+    final Function<Long, CompletableFuture<Long>> COMMIT_AFTER_FLUSH_FUNC =
+        lastAckedTxId -> commit();
 
     private final AlertStatsLogger alertStatsLogger;
     private final WriteLimiter writeLimiter;
@@ -341,11 +327,6 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         }
 
         this.conf = conf;
-        if (null != scheduler) {
-            this.addCompleteFuturePool = new 
SafeQueueingFuturePool<Void>(scheduler.getFuturePool(streamName));
-        } else {
-            this.addCompleteFuturePool = null;
-        }
         assert(!this.immediateFlushEnabled || (null != this.scheduler));
         this.lastTransmit = Stopwatch.createStarted();
     }
@@ -360,11 +341,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     }
 
     @VisibleForTesting
-    FuturePool getFuturePool() {
-        if (null == scheduler) {
-            return null;
-        }
-        return scheduler.getFuturePool(streamName);
+    ScheduledExecutorService getFuturePool() {
+        return scheduler.chooseExecutor(streamName);
     }
 
     @VisibleForTesting
@@ -471,21 +449,15 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     }
 
     @Override
-    public Future<Void> asyncClose() {
+    public CompletableFuture<Void> asyncClose() {
         return closeInternal(false);
     }
 
     @Override
-    public Future<Void> asyncAbort() {
+    public CompletableFuture<Void> asyncAbort() {
         return closeInternal(true);
     }
 
-    private void flushAddCompletes() {
-        if (null != addCompleteFuturePool) {
-            addCompleteFuturePool.close();
-        }
-    }
-
     private synchronized void abortPacket(BKTransmitPacket packet) {
         long numRecords = 0;
         if (null != packet) {
@@ -495,7 +467,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             if (BKException.Code.OK == rc) {
                 rc = BKException.Code.InterruptedException;
             }
-            Throwable reason = new WriteCancelledException(streamName, 
FutureUtils.transmitException(rc));
+            Throwable reason = new WriteCancelledException(streamName, 
Utils.transmitException(rc));
             recordSet.abortTransmit(reason);
         }
         LOG.info("Stream {} aborted {} writes", fullyQualifiedLogSegment, 
numRecords);
@@ -509,21 +481,13 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         }
     }
 
-    private synchronized long getPendingAddCompleteCount() {
-        if (null != addCompleteFuturePool) {
-            return addCompleteFuturePool.size();
-        } else {
-            return 0;
-        }
-    }
-
-    private Future<Void> closeInternal(boolean abort) {
-        Promise<Void> closePromise;
+    private CompletableFuture<Void> closeInternal(boolean abort) {
+        CompletableFuture<Void> closePromise;
         synchronized (this) {
             if (null != closeFuture) {
                 return closeFuture;
             }
-            closePromise = closeFuture = new Promise<Void>();
+            closePromise = closeFuture = new CompletableFuture<Void>();
         }
 
         AtomicReference<Throwable> throwExc = new 
AtomicReference<Throwable>(null);
@@ -533,7 +497,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
 
     private void closeInternal(final boolean abort,
                                final AtomicReference<Throwable> throwExc,
-                               final Promise<Void> closePromise) {
+                               final CompletableFuture<Void> closePromise) {
         // clean stats resources
         this.transmitOutstandingLogger.unregisterGauge("requests", 
transmitOutstandingGauge);
         this.writeLimiter.close();
@@ -560,7 +524,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         if (!abort && !isLogSegmentInError()) {
             this.enforceLock = false;
             LOG.info("Flushing before closing log segment {}", 
getFullyQualifiedLogSegment());
-            flushAndCommit().addEventListener(new FutureEventListener<Long>() {
+            flushAndCommit().whenComplete(new FutureEventListener<Long>() {
                 @Override
                 public void onSuccess(Long value) {
                     abortTransmitPacketOnClose(abort, throwExc, closePromise);
@@ -580,11 +544,11 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
 
     private void abortTransmitPacketOnClose(final boolean abort,
                                             final AtomicReference<Throwable> 
throwExc,
-                                            final Promise<Void> closePromise) {
+                                            final CompletableFuture<Void> 
closePromise) {
         LOG.info("Closing BKPerStreamLogWriter (abort={}) for {} :" +
-                        " lastDLSN = {} outstandingTransmits = {} 
writesPendingTransmit = {} addCompletesPending = {}",
+                        " lastDLSN = {} outstandingTransmits = {} 
writesPendingTransmit = {}",
                 new Object[]{abort, fullyQualifiedLogSegment, getLastDLSN(),
-                        outstandingTransmits.get(), 
getWritesPendingTransmit(), getPendingAddCompleteCount()});
+                        outstandingTransmits.get(), 
getWritesPendingTransmit()});
 
         // Save the current packet to reset, leave a new empty packet to avoid 
a race with
         // addCompleteDeferredProcessing.
@@ -602,7 +566,6 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             packetPreviousSaved.addTransmitCompleteListener(new 
FutureEventListener<Integer>() {
                 @Override
                 public void onSuccess(Integer transmitResult) {
-                    flushAddCompletes();
                     abortPacket(packetCurrentSaved);
                 }
                 @Override
@@ -620,7 +583,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
 
     private void closeLedgerOnClose(final boolean abort,
                                     final AtomicReference<Throwable> throwExc,
-                                    final Promise<Void> closePromise) {
+                                    final CompletableFuture<Void> 
closePromise) {
         // close the log segment if it isn't in error state, so all the 
outstanding addEntry(s) will callback.
         if (null == throwExc.get() && !isLogSegmentInError()) {
             // Synchronous closing the ledger handle, if we couldn't close a 
ledger handle successfully.
@@ -644,16 +607,16 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
 
     private void completeClosePromise(final boolean abort,
                                       final AtomicReference<Throwable> 
throwExc,
-                                      final Promise<Void> closePromise) {
+                                      final CompletableFuture<Void> 
closePromise) {
         // If add entry failed because of closing ledger above, we don't need 
to fail the close operation
         if (!abort && null == throwExc.get() && 
shouldFailCompleteLogSegment()) {
             throwExc.set(new BKTransmitException("Closing an errored stream : 
", transmitResult.get()));
         }
 
         if (null == throwExc.get()) {
-            FutureUtils.setValue(closePromise, null);
+            FutureUtils.complete(closePromise, null);
         } else {
-            FutureUtils.setException(closePromise, throwExc.get());
+            FutureUtils.completeExceptionally(closePromise, throwExc.get());
         }
     }
 
@@ -664,12 +627,12 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     }
 
     @Override
-    synchronized public Future<DLSN> asyncWrite(LogRecord record) {
+    synchronized public CompletableFuture<DLSN> asyncWrite(LogRecord record) {
         return asyncWrite(record, true);
     }
 
-    synchronized public Future<DLSN> asyncWrite(LogRecord record, boolean 
flush) {
-        Future<DLSN> result = null;
+    synchronized public CompletableFuture<DLSN> asyncWrite(LogRecord record, 
boolean flush) {
+        CompletableFuture<DLSN> result = null;
         try {
             if (record.isControl()) {
                 // we don't pack control records with user records together
@@ -677,7 +640,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                 try {
                     transmit();
                 } catch (IOException ioe) {
-                    return Future.exception(new 
WriteCancelledException(fullyQualifiedLogSegment, ioe));
+                    return FutureUtils.exception(new 
WriteCancelledException(fullyQualifiedLogSegment, ioe));
                 }
                 result = writeControlLogRecord(record);
                 transmit();
@@ -685,7 +648,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                 result = writeUserRecord(record);
                 if (!isDurableWriteEnabled) {
                     // we have no idea about the DLSN if durability is turned 
off.
-                    result = Future.value(DLSN.InvalidDLSN);
+                    result = FutureUtils.value(DLSN.InvalidDLSN);
                 }
                 if (flush) {
                     flushIfNeeded();
@@ -697,7 +660,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             if (null != result) {
                 LOG.error("Overriding first result with flush failure {}", 
result);
             }
-            result = Future.exception(ioe);
+            result = FutureUtils.exception(ioe);
 
             // Flush to ensure any prev. writes with flush=false are flushed 
despite failure.
             flushIfNeededNoThrow();
@@ -705,7 +668,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         return result;
     }
 
-    synchronized private Future<DLSN> writeUserRecord(LogRecord record) throws 
IOException {
+    synchronized private CompletableFuture<DLSN> writeUserRecord(LogRecord 
record) throws IOException {
         if (null != closeFuture) {
             throw new WriteException(fullyQualifiedLogSegment, 
BKException.getMessage(BKException.Code.LedgerClosedException));
         }
@@ -737,7 +700,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         // Internally generated log records don't increment the count
         // writeInternal will always set a count regardless of whether it was
         // incremented or not.
-        Future<DLSN> future = null;
+        CompletableFuture<DLSN> future = null;
         try {
             // increment the position for the record to write
             // if the record is failed to write, it would be decremented.
@@ -759,12 +722,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         }
 
         // Track outstanding requests and return the future.
-        return future.ensure(new Function0<BoxedUnit>() {
-            public BoxedUnit apply() {
-                pendingWrites.dec();
-                writeLimiter.release();
-                return null;
-            }
+        return FutureUtils.ensure(future, () -> {
+            pendingWrites.dec();
+            writeLimiter.release();
         });
     }
 
@@ -777,7 +737,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                 (transmitResult.get() != 
BKException.Code.LedgerClosedException);
     }
 
-    synchronized public Future<DLSN> writeInternal(LogRecord record)
+    synchronized public CompletableFuture<DLSN> writeInternal(LogRecord record)
             throws LogRecordTooLongException, LockingException, 
BKTransmitException,
                    WriteException, InvalidEnvelopedEntryException {
         int logRecordSize = record.getPersistentSize();
@@ -802,8 +762,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             record.setPositionWithinLogSegment(positionWithinLogSegment);
         }
 
-        Promise<DLSN> writePromise = new Promise<DLSN>();
-        writePromise.addEventListener(new OpStatsListener<DLSN>(writeTime));
+        CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
+        writePromise.whenComplete(new OpStatsListener<DLSN>(writeTime));
         recordSetWriter.writeRecord(record, writePromise);
 
         if (record.getTransactionId() < lastTxId) {
@@ -818,7 +778,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         return writePromise;
     }
 
-    synchronized private Future<DLSN> writeControlLogRecord()
+    synchronized private CompletableFuture<DLSN> writeControlLogRecord()
             throws BKTransmitException, WriteException, 
InvalidEnvelopedEntryException,
                    LockingException, LogRecordTooLongException {
         LogRecord controlRec = new LogRecord(lastTxId, 
DistributedLogConstants.CONTROL_RECORD_CONTENT);
@@ -826,7 +786,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         return writeControlLogRecord(controlRec);
     }
 
-    synchronized private Future<DLSN> writeControlLogRecord(LogRecord record)
+    synchronized private CompletableFuture<DLSN> 
writeControlLogRecord(LogRecord record)
             throws BKTransmitException, WriteException, 
InvalidEnvelopedEntryException,
                    LockingException, LogRecordTooLongException {
         return writeInternal(record);
@@ -851,12 +811,12 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
      * as read-only in the metadata. No appends to the
      * stream will be allowed after this point
      */
-    public Future<Long> markEndOfStream() {
+    public CompletableFuture<Long> markEndOfStream() {
         synchronized (this) {
             try {
                 writeEndOfStreamMarker();
             } catch (IOException e) {
-                return Future.exception(e);
+                return FutureUtils.exception(e);
             }
             streamEnded = true;
         }
@@ -905,60 +865,60 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
     }
 
     @Override
-    public synchronized Future<Long> flush() {
+    public synchronized CompletableFuture<Long> flush() {
         try {
             checkStateBeforeTransmit();
         } catch (WriteException e) {
-            return Future.exception(e);
+            return FutureUtils.exception(e);
         }
 
-        Future<Integer> transmitFuture;
+        CompletableFuture<Integer> transmitFuture;
         try {
             transmitFuture = transmit();
         } catch (BKTransmitException e) {
-            return Future.exception(e);
+            return FutureUtils.exception(e);
         } catch (LockingException e) {
-            return Future.exception(e);
+            return FutureUtils.exception(e);
         } catch (WriteException e) {
-            return Future.exception(e);
+            return FutureUtils.exception(e);
         } catch (InvalidEnvelopedEntryException e) {
-            return Future.exception(e);
+            return FutureUtils.exception(e);
         }
 
         if (null == transmitFuture) {
             if (null != packetPrevious) {
                 transmitFuture = packetPrevious.getTransmitFuture();
             }  else {
-                return Future.value(getLastTxIdAcknowledged());
+                return FutureUtils.value(getLastTxIdAcknowledged());
             }
         }
 
-        return 
transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
+        return 
transmitFuture.thenCompose(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
     }
 
     @Override
-    public synchronized Future<Long> commit() {
+    public synchronized CompletableFuture<Long> commit() {
         // we don't pack control records with user records together
         // so transmit current output buffer if possible
-        Future<Integer> transmitFuture;
+        CompletableFuture<Integer> transmitFuture;
         try {
             try {
                 transmitFuture = transmit();
             } catch (IOException ioe) {
-                return Future.exception(ioe);
+                return FutureUtils.exception(ioe);
             }
             if (null == transmitFuture) {
                 writeControlLogRecord();
                 return flush();
             }
         } catch (IOException ioe) {
-            return Future.exception(ioe);
+            return FutureUtils.exception(ioe);
         }
-        return 
transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
+        return 
transmitFuture.thenCompose(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
     }
 
-    Future<Long> flushAndCommit() {
-        return flush().flatMap(COMMIT_AFTER_FLUSH_FUNC);
+    CompletableFuture<Long> flushAndCommit() {
+        return flush().thenCompose(COMMIT_AFTER_FLUSH_FUNC);
     }
 
     void flushIfNeededNoThrow() {
@@ -1054,7 +1014,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
      * @throws WriteException if failed to create the envelope for the data to 
transmit
      * @throws InvalidEnvelopedEntryException when built an invalid enveloped 
entry
      */
-    private Future<Integer> transmit()
+    private CompletableFuture<Integer> transmit()
         throws BKTransmitException, LockingException, WriteException, 
InvalidEnvelopedEntryException {
         EntryBuffer recordSetToTransmit;
         transmitLock.lock();
@@ -1183,10 +1143,11 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             }
         }
 
-        if (null != addCompleteFuturePool) {
+        if (null != scheduler) {
             final Stopwatch queuedTime = Stopwatch.createStarted();
-            addCompleteFuturePool.apply(new Function0<Void>() {
-                public Void apply() {
+            scheduler.submit(streamName, new Callable<Void>() {
+                @Override
+                public Void call() {
                     final Stopwatch deferredTime = Stopwatch.createStarted();
                     
addCompleteQueuedTime.registerSuccessfulEvent(queuedTime.elapsed(TimeUnit.MICROSECONDS));
                     addCompleteDeferredProcessing(transmitPacket, entryId, 
effectiveRC.get());
@@ -1198,7 +1159,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
                     return String.format("AddComplete(Stream=%s, entryId=%d, 
rc=%d)",
                             fullyQualifiedLogSegment, entryId, rc);
                 }
-            }).addEventListener(new FutureEventListener<Void>() {
+            }).whenComplete(new FutureEventListener<Void>() {
                 @Override
                 public void onSuccess(Void done) {
                 }
@@ -1278,7 +1239,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
         if (BKException.Code.OK == transmitResult.get()) {
             recordSet.completeTransmit(logSegmentSequenceNumber, entryId);
         } else {
-            
recordSet.abortTransmit(FutureUtils.transmitException(transmitResult.get()));
+            
recordSet.abortTransmit(Utils.transmitException(transmitResult.get()));
         }
 
         if (cancelPendingPromises) {
@@ -1292,7 +1253,7 @@ class BKLogSegmentWriter implements LogSegmentWriter, 
AddCallback, Runnable, Siz
             }
             packetCurrentSaved.getRecordSet().abortTransmit(
                     new WriteCancelledException(streamName,
-                            
FutureUtils.transmitException(transmitResult.get())));
+                            Utils.transmitException(transmitResult.get())));
         }
     }
 

Reply via email to