junrao commented on code in PR #20088:
URL: https://github.com/apache/kafka/pull/20088#discussion_r2191205175


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1852,7 +1852,11 @@ class ReplicaManager(val config: KafkaConfig,
       // Once we read from a non-empty partition, we stop ignoring request and 
partition level size limits
       if (recordBatchSize > 0)
         minOneMessage = false
-      limitBytes = math.max(0, limitBytes - recordBatchSize)
+      // Because we don't know how much data will be retrieved in remote fetch 
yet, and we don't want to block the API call
+      // to query remoteLogMetadata, assume it will fetch the max bytes size 
of data to avoid to exceed the "fetch.max.bytes" setting.
+      val estimatedRecordBatchSize = if (recordBatchSize == 0 && 
readResult.info.delayedRemoteStorageFetch.isPresent)
+        readResult.info.delayedRemoteStorageFetch.get.fetchMaxBytes else 
recordBatchSize

Review Comment:
   Hmm, currently, fetch for consumer only processes the first partition with 
remote storage. So, it seems that this change could return less data than truly 
available. For example, if `maxBytes` is 2MB, 
`delayedRemoteStorageFetch.get.fetchMaxBytes` is 1MB, the first 2 partitions 
are read from remote storage, but the 3rd partition is read locally, this 
change will fetch 0 bytes from the 3rd partition, but only return 1MB (for the 
1st partition) in the fetch response.
   
   Note that `readFromLog()` is also used by share fetch, which supports 
fetching from multiple partitions with remote storage. So, this code is correct 
for share fetch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to