kirktrue commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1849333895


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -230,6 +234,10 @@ private ClientRequest makeClientRequest(
             unsent.handler
         );
     }
+    
+    public CompletableFuture<RuntimeException> metadataError() {
+        return metadataException;
+    }

Review Comment:
   Any reason not to match the names?
   
   ```suggestion
       public CompletableFuture<RuntimeException> metadataException() {
           return metadataException;
       }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -150,6 +152,8 @@ private void maybePropagateMetadataError() {
         try {
             metadata.maybeThrowAnyException();
         } catch (Exception e) {
+            metadataException.completeExceptionally(e);
+            metadataException = metadataException.newIncompleteFuture();

Review Comment:
   What's the benefit of `newIncompleteFuture()` over `new CompletableFuture()`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -206,12 +213,20 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>> fetchO
                         " Result {}", timestampsToSearch, result);
             }
         });
-
+        

Review Comment:
   Nit: whitespace cleanup.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -206,12 +213,20 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>> fetchO
                         " Result {}", timestampsToSearch, result);
             }
         });
-
+        
         prepareFetchOffsetsRequests(timestampsToSearch, requireTimestamps, 
listOffsetsRequestState);
-        return listOffsetsRequestState.globalResult.thenApply(
+
+        
topicToOffsetsResult.set(listOffsetsRequestState.globalResult.thenApply(
                 result -> OffsetFetcherUtils.buildOffsetsForTimeInternalResult(
                         timestampsToSearch,
-                        result.fetchedOffsets));
+                        result.fetchedOffsets)));
+
+        new ArrayList<>(metadataErrors).forEach(metadataError -> 
metadataError.whenComplete((__, error) -> {
+            topicToOffsetsResult.get().completeExceptionally(error);

Review Comment:
   This looks like we're completing the same `Future` multiple times 🤔 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -186,9 +187,15 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
      */
     public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsets(
             Map<TopicPartition, Long> timestampsToSearch,
-            boolean requireTimestamps) {
+            boolean requireTimestamps,
+            List<CompletableFuture<RuntimeException>> metadataErrors
+    ) {
+        AtomicReference<CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>>> 
+                topicToOffsetsResult = new AtomicReference<>();
+        
         if (timestampsToSearch.isEmpty()) {
-            return CompletableFuture.completedFuture(Collections.emptyMap());
+            topicToOffsetsResult.get().complete(Collections.emptyMap());
+            return topicToOffsetsResult.get();

Review Comment:
   Wouldn't this throw a `NullPointerException`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -186,9 +187,15 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
      */
     public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsets(
             Map<TopicPartition, Long> timestampsToSearch,
-            boolean requireTimestamps) {
+            boolean requireTimestamps,
+            List<CompletableFuture<RuntimeException>> metadataErrors
+    ) {
+        AtomicReference<CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>>> 
+                topicToOffsetsResult = new AtomicReference<>();

Review Comment:
   I'm not following the need for this to be wrapped in an `AtomicReference`. 
All of the things happening are happening on the background thread, right? As 
such they don't need to be thread safe. (That's a major design goal of the new 
consumer.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to