[ 
https://issues.apache.org/jira/browse/KAFKA-6987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704611#comment-16704611
 ] 

ASF GitHub Bot commented on KAFKA-6987:
---------------------------------------

andrasbeni closed pull request #5131: KAFKA-6987 Reimplement KafkaFuture with 
CompletableFuture
URL: https://github.com/apache/kafka/pull/5131
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java 
b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 9cd2e01dc42..4afe3c530bf 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -19,14 +19,19 @@
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
- * A flexible future which supports call chaining and other asynchronous 
programming patterns. This will
- * eventually become a thin shim on top of Java 8's CompletableFuture.
+ * A flexible future which supports call chaining and other asynchronous 
programming patterns. This
+ * is a thin shim on top of Java 8's CompletableFuture.
+ *
+ * Please note that while this class offers methods similar to 
CompletableFuture's whenComplete and thenApply,
+ * functions passed to these methods will never be called with 
CompletionException. If you wish to use
+ * CompletableFuture semantics, use {@link #toCompletableFuture()}.
  *
  * The API for this class is still evolving and we may break compatibility in 
minor releases, if necessary.
  */
@@ -202,4 +207,13 @@ public abstract T get(long timeout, TimeUnit unit) throws 
InterruptedException,
      */
     @Override
     public abstract boolean isDone();
+
+    /**
+     * Returns a ComletableFuture equivalent to this Future.
+     *
+     * Implemented in {@link KafkaFuture} throws UnsuportedOperationException.
+     */
+    public CompletableFuture<T> toCompletableFuture() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index 33916ac952a..fd9093d763a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -16,124 +16,38 @@
  */
 package org.apache.kafka.common.internals;
 
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.kafka.common.KafkaFuture;
+
+
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.kafka.common.KafkaFuture;
-
 /**
  * A flexible future which supports call chaining and other asynchronous 
programming patterns.
- * This will eventually become a thin shim on top of Java 8's 
CompletableFuture.
+ * This is a thin shim on top of Java 8's CompletableFuture.
+ *
+ * Please note that while this class offers methods similar to 
CompletableFuture's whenComplete and thenApply,
+ * functions passed to these methods will never be called with 
CompletionException. If you wish to use
+ * CompletableFuture semantics, use {@link #toCompletableFuture()}.
+ *
  */
 public class KafkaFutureImpl<T> extends KafkaFuture<T> {
-    /**
-     * A convenience method that throws the current exception, wrapping it if 
needed.
-     *
-     * In general, KafkaFuture throws CancellationException and 
InterruptedException directly, and
-     * wraps all other exceptions in an ExecutionException.
-     */
-    private static void wrapAndThrow(Throwable t) throws InterruptedException, 
ExecutionException {
-        if (t instanceof CancellationException) {
-            throw (CancellationException) t;
-        } else if (t instanceof InterruptedException) {
-            throw (InterruptedException) t;
-        } else {
-            throw new ExecutionException(t);
-        }
-    }
-
-    private static class Applicant<A, B> implements BiConsumer<A, Throwable> {
-        private final BaseFunction<A, B> function;
-        private final KafkaFutureImpl<B> future;
 
-        Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) {
-            this.function = function;
-            this.future = future;
-        }
+    private CompletableFuture<T> completableFuture;
 
-        @Override
-        public void accept(A a, Throwable exception) {
-            if (exception != null) {
-                future.completeExceptionally(exception);
-            } else {
-                try {
-                    B b = function.apply(a);
-                    future.complete(b);
-                } catch (Throwable t) {
-                    future.completeExceptionally(t);
-                }
-            }
-        }
+    public KafkaFutureImpl() {
+        this(new CompletableFuture<>());
     }
 
-    private static class SingleWaiter<R> implements BiConsumer<R, Throwable> {
-        private R value = null;
-        private Throwable exception = null;
-        private boolean done = false;
-
-        @Override
-        public synchronized void accept(R newValue, Throwable newException) {
-            this.value = newValue;
-            this.exception = newException;
-            this.done = true;
-            this.notifyAll();
-        }
-
-        synchronized R await() throws InterruptedException, ExecutionException 
{
-            while (true) {
-                if (exception != null)
-                    wrapAndThrow(exception);
-                if (done)
-                    return value;
-                this.wait();
-            }
-        }
-
-        R await(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException, 
TimeoutException {
-            long startMs = System.currentTimeMillis();
-            long waitTimeMs = unit.toMillis(timeout);
-            long delta = 0;
-            synchronized (this) {
-                while (true) {
-                    if (exception != null)
-                        wrapAndThrow(exception);
-                    if (done)
-                        return value;
-                    if (delta >= waitTimeMs) {
-                        throw new TimeoutException();
-                    }
-                    this.wait(waitTimeMs - delta);
-                    delta = System.currentTimeMillis() - startMs;
-                }
-            }
-        }
+    private KafkaFutureImpl(CompletableFuture<T> future) {
+        this.completableFuture = future;
     }
 
-    /**
-     * True if this future is done.
-     */
-    private boolean done = false;
 
-    /**
-     * The value of this future, or null.  Protected by the object monitor.
-     */
-    private T value = null;
-
-    /**
-     * The exception associated with this future, or null.  Protected by the 
object monitor.
-     */
-    private Throwable exception = null;
-
-    /**
-     * A list of objects waiting for this future to complete (either 
successfully or
-     * exceptionally).  Protected by the object monitor.
-     */
-    private List<BiConsumer<? super T, ? super Throwable>> waiters = new 
ArrayList<>();
 
     /**
      * Returns a new KafkaFuture that, when this future completes normally, is 
executed with this
@@ -141,14 +55,19 @@ R await(long timeout, TimeUnit unit)
      */
     @Override
     public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) {
-        KafkaFutureImpl<R> future = new KafkaFutureImpl<>();
-        addWaiter(new Applicant<>(function, future));
-        return future;
+        return new 
KafkaFutureImpl<R>(completableFuture.thenApply(function::apply));
     }
 
+    @Deprecated
     public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T> 
function) {
-        KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future;
-        futureImpl.addWaiter(new Applicant<>(function, this));
+        ((KafkaFutureImpl<R>) 
future).completableFuture.thenApply(function::apply).whenComplete((t, 
throwable) -> {
+            if (throwable != null) {
+                completableFuture.completeExceptionally(throwable);
+            } else {
+                completableFuture.complete(t);
+            }
+        });
+
     }
 
     /**
@@ -156,100 +75,68 @@ R await(long timeout, TimeUnit unit)
      */
     @Override
     public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
-        return thenApply((BaseFunction<T, R>) function);
+        return new 
KafkaFutureImpl<>(completableFuture.thenApply(function::apply));
     }
 
-    private static class WhenCompleteBiConsumer<T> implements BiConsumer<T, 
Throwable> {
-        private final KafkaFutureImpl<T> future;
-        private final BiConsumer<? super T, ? super Throwable> biConsumer;
-
-        WhenCompleteBiConsumer(KafkaFutureImpl<T> future, BiConsumer<? super 
T, ? super Throwable> biConsumer) {
-            this.future = future;
-            this.biConsumer = biConsumer;
-        }
-
-        @Override
-        public void accept(T val, Throwable exception) {
+    @Override
+    public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super 
Throwable> biConsumer) {
+        KafkaFutureImpl<T> dependent = new KafkaFutureImpl<>();
+        completableFuture.whenComplete((t, throwable) -> {
             try {
-                if (exception != null) {
-                    biConsumer.accept(null, exception);
+                if (throwable instanceof CompletionException) {
+                    if (throwable.getCause() instanceof CancellationException) 
{
+                        biConsumer.accept(null, throwable.getCause());
+                        dependent.cancel(false);
+                    } else {
+                        biConsumer.accept(null, throwable.getCause());
+                        dependent.completeExceptionally(throwable.getCause());
+                    }
+                } else if (throwable != null) {
+                    biConsumer.accept(null, throwable);
+                    dependent.completeExceptionally(throwable);
                 } else {
-                    biConsumer.accept(val, null);
+                    biConsumer.accept(t, null);
+                    dependent.complete(t);
                 }
-            } catch (Throwable e) {
-                if (exception == null) {
-                    exception = e;
+            } catch (Exception e) {
+                Throwable throwableToCompleteWith;
+                if (throwable == null) {
+                    throwableToCompleteWith = e;
+                } else if (throwable instanceof CompletionException) {
+                    throwableToCompleteWith = throwable.getCause();
+                } else {
+                    throwableToCompleteWith = throwable;
                 }
+                dependent.completeExceptionally(throwableToCompleteWith);
             }
-            if (exception != null) {
-                future.completeExceptionally(exception);
-            } else {
-                future.complete(val);
-            }
-        }
-    }
-
-    @Override
-    public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super 
Throwable> biConsumer) {
-        final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
-        addWaiter(new WhenCompleteBiConsumer<>(future, biConsumer));
-        return future;
+        });
+        return dependent;
     }
 
     protected synchronized void addWaiter(BiConsumer<? super T, ? super 
Throwable> action) {
-        if (exception != null) {
-            action.accept(null, exception);
-        } else if (done) {
-            action.accept(value, null);
-        } else {
-            waiters.add(action);
-        }
+        completableFuture.whenComplete(action::accept);
     }
 
     @Override
-    public synchronized boolean complete(T newValue) {
-        List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
-        synchronized (this) {
-            if (done)
-                return false;
-            value = newValue;
-            done = true;
-            oldWaiters = waiters;
-            waiters = null;
-        }
-        for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
-            waiter.accept(newValue, null);
-        }
-        return true;
+    public boolean complete(T newValue) {
+        return completableFuture.complete(newValue);
     }
 
     @Override
     public boolean completeExceptionally(Throwable newException) {
-        List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
-        synchronized (this) {
-            if (done)
-                return false;
-            exception = newException;
-            done = true;
-            oldWaiters = waiters;
-            waiters = null;
-        }
-        for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
-            waiter.accept(null, newException);
-        }
-        return true;
+        return completableFuture.completeExceptionally(newException);
     }
 
     /**
      * If not already completed, completes this future with a 
CancellationException.  Dependent
      * futures that have not already completed will also complete 
exceptionally, with a
      * CompletionException caused by this CancellationException.
+     *
+     * TODO was this true?
      */
     @Override
-    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
-        if (completeExceptionally(new CancellationException()))
-            return true;
-        return exception instanceof CancellationException;
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return completableFuture.cancel(mayInterruptIfRunning);
     }
 
     /**
@@ -257,9 +144,7 @@ public synchronized boolean cancel(boolean 
mayInterruptIfRunning) {
      */
     @Override
     public T get() throws InterruptedException, ExecutionException {
-        SingleWaiter<T> waiter = new SingleWaiter<T>();
-        addWaiter(waiter);
-        return waiter.await();
+        return completableFuture.get();
     }
 
     /**
@@ -269,9 +154,7 @@ public T get() throws InterruptedException, 
ExecutionException {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<T>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        return completableFuture.get(timeout, unit);
     }
 
     /**
@@ -279,40 +162,41 @@ public T get(long timeout, TimeUnit unit) throws 
InterruptedException, Execution
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {
-        if (exception != null)
-            wrapAndThrow(exception);
-        if (done)
-            return value;
-        return valueIfAbsent;
+    public T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {
+        return completableFuture.getNow(valueIfAbsent);
     }
 
     /**
      * Returns true if this CompletableFuture was cancelled before it 
completed normally.
      */
     @Override
-    public synchronized boolean isCancelled() {
-        return (exception != null) && (exception instanceof 
CancellationException);
+    public boolean isCancelled() {
+        return completableFuture.isCancelled();
     }
 
     /**
      * Returns true if this CompletableFuture completed exceptionally, in any 
way.
      */
     @Override
-    public synchronized boolean isCompletedExceptionally() {
-        return exception != null;
+    public boolean isCompletedExceptionally() {
+        return completableFuture.isCompletedExceptionally();
     }
 
     /**
      * Returns true if completed in any fashion: normally, exceptionally, or 
via cancellation.
      */
     @Override
-    public synchronized boolean isDone() {
-        return done;
+    public boolean isDone() {
+        return completableFuture.isDone();
     }
 
     @Override
     public String toString() {
-        return String.format("KafkaFuture{value=%s,exception=%s,done=%b}", 
value, exception, done);
+        return String.format("KafkaFuture{future=%s}", completableFuture);
+    }
+
+    @Override
+    public CompletableFuture<T> toCompletableFuture() {
+        return completableFuture;
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java 
b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index 6f9efca7c66..4e25abc77e0 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -24,6 +24,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -124,6 +125,222 @@ public Integer apply(Integer integer) {
         assertTrue(futureAppliedFail.isCompletedExceptionally());
     }
 
+    @Test
+    public void testWhenCompleteNormalCompletion() {
+        String value = "Ready to roll out!";
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        future.whenComplete(consumer);
+        future.complete(value);
+        assertTrue(consumer.getThrowable() == null);
+        assertEquals(value, consumer.getValue());
+    }
+
+    @Test
+    public void testWhenCompleteExceptionalCompletion() {
+        RuntimeException exception = new RuntimeException("I'm in deep!");
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        future.whenComplete(consumer);
+        future.completeExceptionally(exception);
+        assertTrue(consumer.getValue() == null);
+        assertEquals(exception, consumer.getThrowable());
+    }
+
+    @Test
+    public void testWhenCompleteChained() {
+        String value = "Fueled up, ready to go!";
+        RuntimeException exceptionOnCompletion = new RuntimeException("I'm 
about to drop the hammer");
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer firstConsumer = new QueryableBiConsumer() {
+            @Override
+            public void accept(String s, Throwable throwable) {
+                super.accept(s, throwable);
+                throw exceptionOnCompletion;
+            }
+        };
+        QueryableBiConsumer secondConsumer = new QueryableBiConsumer();
+        QueryableBiConsumer thirdConsumer = new QueryableBiConsumer();
+        KafkaFuture<String> futureByWhenComplete = 
future.whenComplete(firstConsumer);
+        future.whenComplete(secondConsumer);
+        futureByWhenComplete.whenComplete(thirdConsumer);
+        future.complete(value);
+        assertTrue(firstConsumer.getThrowable() == null);
+        assertEquals(value, firstConsumer.getValue());
+        assertTrue(secondConsumer.getThrowable() == null);
+        assertEquals(value, secondConsumer.getValue());
+        assertTrue(thirdConsumer.getValue() == null);
+        assertEquals(exceptionOnCompletion, thirdConsumer.getThrowable());
+    }
+
+    @Test
+    public void testWhenCompleteChainedCompletedExceptionally() {
+        RuntimeException exceptionOnCompletion = new RuntimeException("Can I 
take your order?");
+        RuntimeException exceptionOnCompleteExceptionally = new 
RuntimeException("In the pipe, five by five");
+
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer firstConsumer = new QueryableBiConsumer() {
+            @Override
+            public void accept(String s, Throwable throwable) {
+                super.accept(s, throwable);
+                throw exceptionOnCompletion;
+            }
+        };
+        QueryableBiConsumer secondConsumer = new QueryableBiConsumer();
+        QueryableBiConsumer thirdConsumer = new QueryableBiConsumer();
+        KafkaFuture<String> futureByWhenComplete = 
future.whenComplete(firstConsumer);
+        future.whenComplete(secondConsumer);
+        futureByWhenComplete.whenComplete(thirdConsumer);
+
+        future.completeExceptionally(exceptionOnCompleteExceptionally);
+
+        assertTrue(firstConsumer.getThrowable() == 
exceptionOnCompleteExceptionally);
+        assertEquals(null, firstConsumer.getValue());
+        assertTrue(secondConsumer.getThrowable() == 
exceptionOnCompleteExceptionally);
+        assertEquals(null, secondConsumer.getValue());
+        assertTrue(thirdConsumer.getValue() == null);
+        assertEquals(exceptionOnCompleteExceptionally, 
thirdConsumer.getThrowable());
+    }
+
+    @Test
+    public void testWhenCompleteAfterThenApplyThrows() {
+        String value = "What's our target?";
+        RuntimeException exceptionOnApply = new RuntimeException("What is your 
major malfunction?");
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        KafkaFuture.BaseFunction<String, String> throwerFunction = s -> {
+            throw exceptionOnApply;
+        };
+
+        KafkaFuture<String> futureByThenApply = 
future.thenApply(throwerFunction);
+        KafkaFuture<String> futureByWhenComplete = 
futureByThenApply.whenComplete(consumer);
+        future.complete(value);
+        assertTrue(consumer.getValue() == null);
+        assertEquals(exceptionOnApply, consumer.getThrowable());
+        assertTrue(futureByWhenComplete.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testWhenCompleteAfterCancel() {
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer first = new QueryableBiConsumer();
+        QueryableBiConsumer second = new QueryableBiConsumer();
+        KafkaFuture<String> futureByFirst = future.whenComplete(first);
+        KafkaFuture<String> futureBySecond = 
futureByFirst.whenComplete(second);
+
+        future.cancel(false);
+
+        assertTrue(futureByFirst.isDone());
+        assertTrue(futureByFirst.isCancelled());
+        assertTrue(first.getThrowable() instanceof CancellationException);
+        assertTrue(futureBySecond.isDone());
+        assertTrue(futureBySecond.isCancelled());
+        assertTrue(second.getThrowable() instanceof CancellationException);
+
+
+    }
+
+    @Test
+    public void testCopyWith() throws Exception {
+        String newValue = "I have returned";
+        KafkaFutureImpl<String> dependee = new KafkaFutureImpl<>();
+        KafkaFutureImpl<Integer> dependent = new KafkaFutureImpl<>();
+        dependent.copyWith(dependee, String::length);
+
+        dependee.complete(newValue);
+
+        assertEquals(newValue, dependee.get());
+        assertEquals(newValue.length(), (int) dependent.get());
+    }
+
+    @Test
+    public void testCopyWithFunctionFails() throws Exception {
+        String newValue = "Orders received";
+        RuntimeException exception = new RuntimeException("I can't build 
there");
+        KafkaFutureImpl<String> dependee = new KafkaFutureImpl<>();
+        KafkaFutureImpl<String> dependent = new KafkaFutureImpl<>();
+        dependent.copyWith(dependee, s -> {
+            throw exception;
+        });
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        dependent.whenComplete(consumer);
+
+        dependee.complete(newValue);
+
+        assertEquals(newValue, dependee.get());
+        assertTrue(consumer.getValue() == null);
+        assertEquals(exception, consumer.getThrowable());
+        assertEquals(newValue, dependee.get());
+
+    }
+
+    @Test
+    public void testCopyWithCompletedExceptionally() throws Exception {
+        String newValue = "Orders received";
+        RuntimeException exception = new RuntimeException("I can't build it.");
+        KafkaFutureImpl<String> dependee = new KafkaFutureImpl<>();
+        KafkaFutureImpl<String> dependent = new KafkaFutureImpl<>();
+        dependent.copyWith(dependee, s -> s + "Something in the way");
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        dependent.whenComplete(consumer);
+
+        dependee.completeExceptionally(exception);
+
+        assertTrue(dependent.isCompletedExceptionally());
+        assertTrue(consumer.getValue() == null);
+        assertEquals(exception, consumer.getThrowable());
+        assertTrue(dependee.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testCopyWithCancelled() throws Exception {
+        KafkaFutureImpl<String> dependee = new KafkaFutureImpl<>();
+        KafkaFutureImpl<String> dependent = new KafkaFutureImpl<>();
+        dependent.copyWith(dependee, s -> "Affirmative");
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        dependent.whenComplete(consumer);
+
+        dependee.cancel(false);
+
+        assertTrue(dependee.isCompletedExceptionally());
+        assertTrue(dependent.isDone());
+        assertTrue(dependent.isCompletedExceptionally());
+        assertTrue(consumer.getValue() == null);
+        assertTrue(consumer.getThrowable() instanceof CancellationException);
+    }
+
+
+    @Test
+    public void testCancel() {
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+
+        assertTrue("Must be able to cancel", future.cancel(false));
+
+        assertTrue(future.isCancelled());
+        assertTrue(future.isCompletedExceptionally());
+        assertTrue(future.isDone());
+    }
+
+    private static class QueryableBiConsumer implements 
KafkaFuture.BiConsumer<String, Throwable> {
+
+        private String value;
+        private Throwable throwable;
+
+        @Override
+        public void accept(String s, Throwable throwable) {
+            this.value = s;
+            this.throwable = throwable;
+        }
+
+        public String getValue() {
+            return value;
+        }
+
+        public Throwable getThrowable() {
+            return throwable;
+        }
+    }
+
     private static class CompleterThread<T> extends Thread {
 
         private final KafkaFutureImpl<T> future;
@@ -222,4 +439,6 @@ public void testFutureTimeoutWithZeroWait() throws 
Exception {
         future.get(0, TimeUnit.MILLISECONDS);
     }
 
+
+
 }
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 70c9e770ec3..4af052bc483 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -94,6 +94,12 @@ For a detailed description of findbugs bug categories, see 
http://findbugs.sourc
         <Bug pattern="EQ_UNUSUAL"/>
     </Match>
 
+   <Match>
+        <!-- Null is a valid parameter to CompletableFuture.complete(). 
Exclude false positive. -->
+        <Source name="KafkaAdminClient.java"/>
+        <Bug pattern="NP_NULL_PARAM_DEREF_ALL_TARGETS_DANGEROUS"/>
+    </Match>
+
     <Match>
         <!-- Add a suppression for auto-generated calls to instanceof in 
kafka.utils.Json -->
         <Source name="Json.scala"/>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reimplement KafkaFuture with CompletableFuture
> ----------------------------------------------
>
>                 Key: KAFKA-6987
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6987
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 2.0.0
>            Reporter: Andras Beni
>            Priority: Minor
>
> KafkaFuture documentation states:
> {{This will eventually become a thin shim on top of Java 8's 
> CompletableFuture.}}
> With Java 7 support dropped in 2.0, it is time to get rid of custom code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to