AndrewJSchofield commented on code in PR #20280:
URL: https://github.com/apache/kafka/pull/20280#discussion_r2246000180


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -765,15 +772,18 @@ private boolean maybeCompletePendingRemoteFetch() {
 
         for (TopicIdPartition topicIdPartition : 
pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) {
             try {
-                
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                if (!partition.isLeader()) {
+                    throw new NotLeaderOrFollowerException("Broker is no 
longer the leader of topicIdPartition: " + topicIdPartition);

Review Comment:
   Maybe "topicPartition" is more aligned with the rest of this code.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -765,15 +772,18 @@ private boolean maybeCompletePendingRemoteFetch() {
 
         for (TopicIdPartition topicIdPartition : 
pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) {
             try {
-                
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                if (!partition.isLeader()) {
+                    throw new NotLeaderOrFollowerException("Broker is no 
longer the leader of topicIdPartition: " + topicIdPartition);
+                }
             } catch (KafkaStorageException e) { // Case a
                 log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
                 canComplete = true;
             } catch (UnknownTopicOrPartitionException e) { // Case b
                 log.debug("Broker no longer knows of topicPartition {}, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
                 canComplete = true;
             } catch (NotLeaderOrFollowerException e) { // Case c
-                log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
+                log.debug("Broker is no longer the leader or follower of 
topicIdPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());

Review Comment:
   I'd revert this line because all of the other log lines just call it a 
topicPartition, in spite of technically having the topic ID. That's really fine 
because it is just an identifier for a topic partition.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -368,6 +368,13 @@ public boolean tryComplete() {
                         "topic partitions {}", shareFetch.groupId(), 
shareFetch.memberId(),
                     sharePartitions.keySet());
             }
+            for (TopicIdPartition topicIdPartition : sharePartitions.keySet()) 
{
+                Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                if (!partition.isLeader()) {
+                    log.error("Broker is no longer the leader of 
topicIdPartition {}", topicIdPartition);
+                    throw new NotLeaderOrFollowerException("Broker is no 
longer the leader of topicIdPartition: " +  topicIdPartition);

Review Comment:
   "topicPartition" :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to