adixitconfluent commented on code in PR #17870: URL: https://github.com/apache/kafka/pull/17870#discussion_r1912187645
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -390,18 +407,27 @@ private void handleFetchException( } // Visible for testing. - LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData, LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) { - LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>(); - topicPartitionData.forEach((topicIdPartition, partitionData) -> { + LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions = new LinkedHashMap<>(); + topicPartitionData.forEach((topicIdPartition, fetchOffset) -> { if (!existingFetchedData.containsKey(topicIdPartition)) { - missingLogReadTopicPartitions.put(topicIdPartition, partitionData); + missingLogReadTopicPartitions.put(topicIdPartition, fetchOffset); } }); if (missingLogReadTopicPartitions.isEmpty()) { return existingFetchedData; } - LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions); + + // Computing the total bytes that has already been fetched for the existing fetched data. + int totalPartitionMaxBytesUsed = 0; + for (LogReadResult logReadResult : existingFetchedData.values()) { + totalPartitionMaxBytesUsed += logReadResult.info().records.sizeInBytes(); + } + Review Comment: yeah, this is a slight better version of UNIFORM strategy. The other strategies might already depend on produce volume, so it may already be handled over there and this code might not be useful. Okay, I'll change it to a basic version where partitionMaxBytes for leftover partitions will solely depend upon acquired topic partitions size. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -390,18 +407,27 @@ private void handleFetchException( } // Visible for testing. - LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData, LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) { - LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>(); - topicPartitionData.forEach((topicIdPartition, partitionData) -> { + LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions = new LinkedHashMap<>(); + topicPartitionData.forEach((topicIdPartition, fetchOffset) -> { if (!existingFetchedData.containsKey(topicIdPartition)) { - missingLogReadTopicPartitions.put(topicIdPartition, partitionData); + missingLogReadTopicPartitions.put(topicIdPartition, fetchOffset); } }); if (missingLogReadTopicPartitions.isEmpty()) { return existingFetchedData; } - LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions); + + // Computing the total bytes that has already been fetched for the existing fetched data. + int totalPartitionMaxBytesUsed = 0; + for (LogReadResult logReadResult : existingFetchedData.values()) { + totalPartitionMaxBytesUsed += logReadResult.info().records.sizeInBytes(); + } + Review Comment: yeah, this is a slight better version of UNIFORM strategy. The other strategies might already depend on produce volume, so it may already be handled over there and this code might not be useful. Okay, I'll change it to a basic version where `partitionMaxBytes` for leftover partitions will solely depend upon acquired topic partitions size. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org