apoorvmittal10 commented on code in PR #19592: URL: https://github.com/apache/kafka/pull/19592#discussion_r2072696298
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -248,14 +250,18 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI // updated in a different tryComplete thread. responseData = combineLogReadResponse(topicPartitionData, localPartitionsAlreadyFetched); + resetFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, responseData); Review Comment: This method call seems strange in `processAcquiredTopicPartitionsForLocalLogFetch` where we are calling some remote fetch method reset. I am sure it must be required but can you please write comments here. And then we can see it the `processAcquiredTopicPartitionsForLocalLogFetch` remains valid or we need to find better one. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -248,14 +250,18 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI // updated in a different tryComplete thread. responseData = combineLogReadResponse(topicPartitionData, localPartitionsAlreadyFetched); + resetFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, responseData); + List<ShareFetchPartitionData> shareFetchPartitionDataList = new ArrayList<>(); - responseData.forEach((topicIdPartition, logReadResult) -> - shareFetchPartitionDataList.add(new ShareFetchPartitionData( - topicIdPartition, - topicPartitionData.get(topicIdPartition), - logReadResult.toFetchPartitionData(false) - )) - ); + responseData.forEach((topicIdPartition, logReadResult) -> { + if (logReadResult.info().delayedRemoteStorageFetch.isEmpty()) { Review Comment: Question: What happens if we have some `delayedRemoteStorageFetch` data in logReadResut now? Is it ignored or further processed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org