apoorvmittal10 commented on code in PR #20280:
URL: https://github.com/apache/kafka/pull/20280#discussion_r2257007459


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -368,6 +368,12 @@ public boolean tryComplete() {
                         "topic partitions {}", shareFetch.groupId(), 
shareFetch.memberId(),
                     sharePartitions.keySet());
             }
+            // At this point, there could be delayed requests sitting in the 
purgatory which are waiting on
+            // DelayedShareFetchPartitionKeys corresponding to partitions, 
whose leader has been changed to a different broker.
+            // In that case, such partitions would not be able to get 
acquired, and the tryComplete will keep on returning false.
+            // Eventually the operation will get timed out and completed, but 
it might not get removed from the purgatory.
+            // This has been eventually left it like this because the purge 
interval will make sure that the remaining operations
+            // in the purgatory do not grow indefinitely and are purged time 
to time.

Review Comment:
   Is it time to time, or when keys or operations in purgatory are exceeded 
against a config. Can you please be explicit in the comment.



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1761,6 +1859,16 @@ public void 
testRemoteStorageFetchHappensForAllTopicPartitions() {
         
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
 
         Uuid fetchId = Uuid.randomUuid();
+
+        Partition p0 = mock(Partition.class);
+        when(p0.isLeader()).thenReturn(true);
+
+        Partition p1 = mock(Partition.class);
+        when(p1.isLeader()).thenReturn(true);
+
+        
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
+        
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);

Review Comment:
   Did we add a new test case that validates the added functionality?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -765,7 +771,10 @@ private boolean maybeCompletePendingRemoteFetch() {
 
         for (TopicIdPartition topicIdPartition : 
pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) {
             try {
-                
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                if (!partition.isLeader()) {
+                    throw new NotLeaderOrFollowerException("Broker is no 
longer the leader of topicPartition: " + topicIdPartition);

Review Comment:
   Are you certain the partition is also not follower? If not then shall we use 
NotLeaderException?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to