mumrah commented on code in PR #19800: URL: https://github.com/apache/kafka/pull/19800#discussion_r2150719050
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -1514,19 +1527,22 @@ private CompletableFuture<FetchResponseData> handleFetchRequest( || FetchResponse.recordsSize(partitionResponse) > 0 || request.maxWaitMs() == 0 || isPartitionDiverged(partitionResponse) - || isPartitionSnapshotted(partitionResponse)) { + || isPartitionSnapshotted(partitionResponse) + || isHighWatermarkUpdated(partitionResponse, fetchPartition)) { // Reply immediately if any of the following is true // 1. The response contains an error // 2. There are records in the response // 3. The fetching replica doesn't want to wait for the partition to contain new data // 4. The fetching replica needs to truncate because the log diverged // 5. The fetching replica needs to fetch a snapshot + // 6. The fetching replica should update its high-watermark Review Comment: Not specific to this PR, but when debugging deferred requests in the past, I always wished for some trace logging to enable to see why things were being completed. For example, if request CPU is really high and things are not being deferred as expected, it could be due to a too low maxWaitMs or any of these other reasons. Maybe something to consider adding down the road. ########## clients/src/main/resources/common/message/FetchRequest.json: ########## @@ -103,7 +105,10 @@ { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", "about": "The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored." }, { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", "taggedVersions": "17+", "tag": 0, "ignorable": true, - "about": "The directory id of the follower fetching." } + "about": "The directory id of the follower fetching." }, + { "name": "HighWatermark", "type": "int64", "versions": "18+", "default": "9223372036854775807", "taggedVersions": "18+", + "tag": 1, "ignorable": true, + "about": "The high-watermark known by the replica. -1 if the high-watermark is not known." } Review Comment: We should probably explain the default "9223372036854775807" as well as "-1" ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2882,6 +2908,7 @@ private FetchRequestData buildFetchRequest() { .setLastFetchedEpoch(log.lastFetchedEpoch()) .setFetchOffset(log.endOffset().offset()) .setReplicaDirectoryId(quorum.localDirectoryId()) + .setHighWatermark(quorum.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L)) Review Comment: We don't need to condition this on the MV since we're relying on the resolved ApiVersions. Right? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -386,6 +385,11 @@ private void onUpdateLeaderHighWatermark( // records still held in memory directly to the listener appendPurgatory.maybeComplete(highWatermark.offset(), currentTimeMs); + // After updating the high-watermark, complete all of the deferred + // fetch requests. This is always correct because all fetch request + // deferred have a HWM less or equal to the previous leader's HWM. + fetchPurgatory.completeAll(currentTimeMs); Review Comment: Want to make sure I understand why there's not a race here. If a new fetch arrives after we have updated the HWM (on L376), it's still okay to be completed because its guaranteed to not have the latest HWM. Is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org