mumrah commented on code in PR #19800:
URL: https://github.com/apache/kafka/pull/19800#discussion_r2150719050


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1514,19 +1527,22 @@ private CompletableFuture<FetchResponseData> 
handleFetchRequest(
             || FetchResponse.recordsSize(partitionResponse) > 0
             || request.maxWaitMs() == 0
             || isPartitionDiverged(partitionResponse)
-            || isPartitionSnapshotted(partitionResponse)) {
+            || isPartitionSnapshotted(partitionResponse)
+            || isHighWatermarkUpdated(partitionResponse, fetchPartition)) {
             // Reply immediately if any of the following is true
             // 1. The response contains an error
             // 2. There are records in the response
             // 3. The fetching replica doesn't want to wait for the partition 
to contain new data
             // 4. The fetching replica needs to truncate because the log 
diverged
             // 5. The fetching replica needs to fetch a snapshot
+            // 6. The fetching replica should update its high-watermark

Review Comment:
   Not specific to this PR, but when debugging deferred requests in the past, I 
always wished for some trace logging to enable to see why things were being 
completed. For example, if request CPU is really high and things are not being 
deferred as expected, it could be due to a too low maxWaitMs or any of these 
other reasons. Maybe something to consider adding down the road.



##########
clients/src/main/resources/common/message/FetchRequest.json:
##########
@@ -103,7 +105,10 @@
         { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
           "about": "The maximum bytes to fetch from this partition.  See 
KIP-74 for cases where this limit may not be honored." },
         { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", 
"taggedVersions": "17+", "tag": 0, "ignorable": true,
-          "about": "The directory id of the follower fetching." }
+          "about": "The directory id of the follower fetching." },
+        { "name": "HighWatermark", "type": "int64", "versions": "18+", 
"default": "9223372036854775807", "taggedVersions": "18+",
+          "tag": 1, "ignorable": true,
+          "about": "The high-watermark known by the replica. -1 if the 
high-watermark is not known." }

Review Comment:
   We should probably explain the default "9223372036854775807" as well as "-1"



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2882,6 +2908,7 @@ private FetchRequestData buildFetchRequest() {
                 .setLastFetchedEpoch(log.lastFetchedEpoch())
                 .setFetchOffset(log.endOffset().offset())
                 .setReplicaDirectoryId(quorum.localDirectoryId())
+                
.setHighWatermark(quorum.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L))

Review Comment:
   We don't need to condition this on the MV since we're relying on the 
resolved ApiVersions. Right?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -386,6 +385,11 @@ private void onUpdateLeaderHighWatermark(
             // records still held in memory directly to the listener
             appendPurgatory.maybeComplete(highWatermark.offset(), 
currentTimeMs);
 
+            // After updating the high-watermark, complete all of the deferred
+            // fetch requests. This is always correct because all fetch request
+            // deferred have a HWM less or equal to the previous leader's HWM.
+            fetchPurgatory.completeAll(currentTimeMs);

Review Comment:
   Want to make sure I understand why there's not a race here. If a new fetch 
arrives after we have updated the HWM (on L376), it's still okay to be 
completed because its guaranteed to not have the latest HWM. Is that right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to