tombentley commented on a change in pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#discussion_r662856714



##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -267,50 +173,77 @@ public T get() throws InterruptedException, 
ExecutionException {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        try {
+            return completableFuture.get(timeout, unit);
+        } catch (ExecutionException e) {
+            maybeRewrapAndThrow(e.getCause());
+            throw e;
+        }
     }
 
     /**
      * Returns the result value (or throws any encountered exception) if 
completed, else returns
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {
-        if (exception != null)
-            wrapAndThrow(exception);
-        if (done)
-            return value;
-        return valueIfAbsent;
+    public synchronized T getNow(T valueIfAbsent) throws ExecutionException {
+        try {
+            return completableFuture.getNow(valueIfAbsent);
+        } catch (CompletionException e) {
+            maybeRewrapAndThrow(e.getCause());
+            // Note, unlike CF#get() which throws ExecutionException, 
CF#getNow() throws CompletionException
+            // thus needs rewrapping to conform to KafkaFuture API, where 
KF#getNow() throws ExecutionException.
+            throw new ExecutionException(e.getCause());
+        }
     }
 
     /**
      * Returns true if this CompletableFuture was cancelled before it 
completed normally.
      */
     @Override
     public synchronized boolean isCancelled() {
-        return exception instanceof CancellationException;
+        if (isDependant) {
+            Throwable exception;
+            try {
+                completableFuture.getNow(null);
+                return false;
+            } catch (Exception e) {
+                exception = e;
+            }
+            return exception instanceof CompletionException
+                    && exception.getCause() instanceof CancellationException;

Review comment:
       The historical API of `KafkaFuture` on cancellation of an upstream 
future (e.g. `thenApply()`) was for both futures to return true from 
`isDone()`, `isCancelled()` and `isCompletedExceptionally()`, and for both 
`getNow` and `get()` to throw a `CancellationException`. 
   
   If the new implementation of `isCancelled()` just does `return 
completableFuture.isCancelled();` then for a dependent future `isCancelled()` 
would return false because it's exception is a `CompletionException` wrapping a 
`CancellationException` and `CompleteableFuture#isCancelled()` just checks for 
the exception being `CancellationException`. So we need to do something 
different for cancellation of dependent futures. 
   
   Because `KafkaFutureImpl.thenApply()` doesn't itself keep track of waiters 
any more it's not possible to do anything clever in the implementation of 
`cancel()`, so we have to do something in `isCancelled()`.
   
   So to (hopefully) answer your question: It does propagate already, but the 
APIs differ and what we're doing here is to maintain the existing semantics of 
`KafkaFuture`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to