apoorvmittal10 commented on code in PR #20280: URL: https://github.com/apache/kafka/pull/20280#discussion_r2257007459
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -368,6 +368,12 @@ public boolean tryComplete() { "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), sharePartitions.keySet()); } + // At this point, there could be delayed requests sitting in the purgatory which are waiting on + // DelayedShareFetchPartitionKeys corresponding to partitions, whose leader has been changed to a different broker. + // In that case, such partitions would not be able to get acquired, and the tryComplete will keep on returning false. + // Eventually the operation will get timed out and completed, but it might not get removed from the purgatory. + // This has been eventually left it like this because the purge interval will make sure that the remaining operations + // in the purgatory do not grow indefinitely and are purged time to time. Review Comment: Is it time to time, or when keys or operations in purgatory are exceeded against a config. Can you please be explicit in the comment. ########## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ########## @@ -1761,6 +1859,16 @@ public void testRemoteStorageFetchHappensForAllTopicPartitions() { when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); Uuid fetchId = Uuid.randomUuid(); + + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + Partition p1 = mock(Partition.class); + when(p1.isLeader()).thenReturn(true); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1); Review Comment: Did we add a new test case that validates the added functionality? ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -765,7 +771,10 @@ private boolean maybeCompletePendingRemoteFetch() { for (TopicIdPartition topicIdPartition : pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) { try { - replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + if (!partition.isLeader()) { + throw new NotLeaderOrFollowerException("Broker is no longer the leader of topicPartition: " + topicIdPartition); Review Comment: Are you certain the partition is also not follower? If not then shall we use NotLeaderException? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org