dajac commented on a change in pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#discussion_r662860700



##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -267,50 +173,77 @@ public T get() throws InterruptedException, 
ExecutionException {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        try {
+            return completableFuture.get(timeout, unit);
+        } catch (ExecutionException e) {
+            maybeRewrapAndThrow(e.getCause());
+            throw e;
+        }
     }
 
     /**
      * Returns the result value (or throws any encountered exception) if 
completed, else returns
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {
-        if (exception != null)
-            wrapAndThrow(exception);
-        if (done)
-            return value;
-        return valueIfAbsent;
+    public synchronized T getNow(T valueIfAbsent) throws ExecutionException {
+        try {
+            return completableFuture.getNow(valueIfAbsent);
+        } catch (CompletionException e) {
+            maybeRewrapAndThrow(e.getCause());
+            // Note, unlike CF#get() which throws ExecutionException, 
CF#getNow() throws CompletionException
+            // thus needs rewrapping to conform to KafkaFuture API, where 
KF#getNow() throws ExecutionException.
+            throw new ExecutionException(e.getCause());
+        }
     }
 
     /**
      * Returns true if this CompletableFuture was cancelled before it 
completed normally.
      */
     @Override
     public synchronized boolean isCancelled() {
-        return exception instanceof CancellationException;
+        if (isDependant) {
+            Throwable exception;
+            try {
+                completableFuture.getNow(null);
+                return false;
+            } catch (Exception e) {
+                exception = e;
+            }
+            return exception instanceof CompletionException
+                    && exception.getCause() instanceof CancellationException;

Review comment:
       Oh.. I see. Thanks for the clarification. It would be great if you could 
add a comment which explains this because this is not obvious.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to