apoorvmittal10 commented on code in PR #20280:
URL: https://github.com/apache/kafka/pull/20280#discussion_r2262923031


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -757,30 +766,37 @@ private void processRemoteFetchOrException(
      * Case a: The partition is in an offline log directory on this broker
      * Case b: This broker does not know the partition it tries to fetch
      * Case c: This broker is no longer the leader of the partition it tries 
to fetch
-     * Case d: All remote storage read requests completed
+     * Case d: This broker is no longer the leader or follower of the 
partition it tries to fetch
+     * Case e: All remote storage read requests completed
      * @return boolean representing whether the remote fetch is completed or 
not.
      */
     private boolean maybeCompletePendingRemoteFetch() {
         boolean canComplete = false;
 
         for (TopicIdPartition topicIdPartition : 
pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) {
             try {
-                
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                if (!partition.isLeader()) {
+                    throw new NotLeaderException("Broker is no longer the 
leader of topicPartition: " + topicIdPartition);
+                }
             } catch (KafkaStorageException e) { // Case a
                 log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
                 canComplete = true;
             } catch (UnknownTopicOrPartitionException e) { // Case b
                 log.debug("Broker no longer knows of topicPartition {}, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
                 canComplete = true;
-            } catch (NotLeaderOrFollowerException e) { // Case c
+            } catch (NotLeaderException e) { // Case c
                 log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());

Review Comment:
   The text is incorrect. It should be for `NotLeaderOrFollowerException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to