ahuang98 commented on code in PR #19800:
URL: https://github.com/apache/kafka/pull/19800#discussion_r2112701439


##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -264,13 +267,15 @@ public short partitionRecordVersion() {
     }
 
     public short fetchRequestVersion() {
-        if (this.isAtLeast(IBP_3_9_IV0)) {
+        if (isAtLeast(IBP_4_1_IV1)) {
+            return 18;

Review Comment:
   should have thought about this more during the KIP review - can you remind 
me why the default value of the new HW field would ever be used? if the fetch 
request version to use is dictated by MV, then shouldn't all nodes agree on 
which fetch request/response version to use?



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -116,6 +116,9 @@ public enum MetadataVersion {
     // Streams groups are early access in 4.1 (KIP-1071).
     IBP_4_1_IV0(26, "4.1", "IV0", false),
 
+    // Send FETCH verion 18 in the replica fetcher (KIP-1166)

Review Comment:
   nit: "version"



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1556,26 +1572,25 @@ private CompletableFuture<FetchResponseData> 
handleFetchRequest(
                         Optional.empty()
                     );
                 }
-            }
-
-            // FIXME: `completionTimeMs`, which can be null
-            logger.trace(
-                "Completing delayed fetch from {} starting at offset {} at {}",
-                replicaKey,
-                fetchPartition.fetchOffset(),
-                completionTimeMs
-            );
+            } else {
+                logger.trace(
+                    "Completing delayed fetch from {} starting at offset {} at 
{}",
+                    replicaKey,
+                    fetchPartition.fetchOffset(),
+                    completionTimeMs
+                );
 
-            // It is safe to call tryCompleteFetchRequest because only the 
polling thread completes this
-            // future successfully. This is true because only the polling 
thread appends record batches to
-            // the log from maybeAppendBatches.
-            return tryCompleteFetchRequest(
-                requestMetadata.listenerName(),
-                requestMetadata.apiVersion(),
-                replicaKey,
-                fetchPartition,
-                time.milliseconds()
-            );
+                // It is safe to call tryCompleteFetchRequest because only the 
polling thread completes this
+                // future successfully. This is true because only the polling 
thread appends record batches to
+                // the log from maybeAppendBatches.
+                return tryCompleteFetchRequest(
+                    requestMetadata.listenerName(),
+                    requestMetadata.apiVersion(),
+                    replicaKey,
+                    fetchPartition,
+                    completionTimeMs

Review Comment:
   nice catch



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -386,6 +385,11 @@ private void onUpdateLeaderHighWatermark(
             // records still held in memory directly to the listener
             appendPurgatory.maybeComplete(highWatermark.offset(), 
currentTimeMs);
 
+            // After updating the high-watermark, complete all of the deferred
+            // fetch requests. This is always correct because all fetch request
+            // deffered have a HWM less or equal to the previous leader's HWM.

Review Comment:
   nit: deferred 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to