apoorvmittal10 commented on code in PR #19592:
URL: https://github.com/apache/kafka/pull/19592#discussion_r2072696298


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -248,14 +250,18 @@ private void 
processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI
                 // updated in a different tryComplete thread.
                 responseData = combineLogReadResponse(topicPartitionData, 
localPartitionsAlreadyFetched);
 
+            
resetFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, 
responseData);

Review Comment:
   This method call seems strange in 
`processAcquiredTopicPartitionsForLocalLogFetch` where we are calling some 
remote fetch method reset. I am sure it must be required but can you please 
write comments here. And then we can see it the 
`processAcquiredTopicPartitionsForLocalLogFetch` remains valid or we need to 
find better one.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -248,14 +250,18 @@ private void 
processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI
                 // updated in a different tryComplete thread.
                 responseData = combineLogReadResponse(topicPartitionData, 
localPartitionsAlreadyFetched);
 
+            
resetFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, 
responseData);
+
             List<ShareFetchPartitionData> shareFetchPartitionDataList = new 
ArrayList<>();
-            responseData.forEach((topicIdPartition, logReadResult) ->
-                shareFetchPartitionDataList.add(new ShareFetchPartitionData(
-                    topicIdPartition,
-                    topicPartitionData.get(topicIdPartition),
-                    logReadResult.toFetchPartitionData(false)
-                ))
-            );
+            responseData.forEach((topicIdPartition, logReadResult) -> {
+                if (logReadResult.info().delayedRemoteStorageFetch.isEmpty()) {

Review Comment:
   Question: What happens if we have some `delayedRemoteStorageFetch` data in 
logReadResut now? Is it ignored or further processed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to