AndrewJSchofield commented on code in PR #20280: URL: https://github.com/apache/kafka/pull/20280#discussion_r2246000180
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -765,15 +772,18 @@ private boolean maybeCompletePendingRemoteFetch() { for (TopicIdPartition topicIdPartition : pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) { try { - replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + if (!partition.isLeader()) { + throw new NotLeaderOrFollowerException("Broker is no longer the leader of topicIdPartition: " + topicIdPartition); Review Comment: Maybe "topicPartition" is more aligned with the rest of this code. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -765,15 +772,18 @@ private boolean maybeCompletePendingRemoteFetch() { for (TopicIdPartition topicIdPartition : pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) { try { - replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + if (!partition.isLeader()) { + throw new NotLeaderOrFollowerException("Broker is no longer the leader of topicIdPartition: " + topicIdPartition); + } } catch (KafkaStorageException e) { // Case a log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); canComplete = true; } catch (UnknownTopicOrPartitionException e) { // Case b log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); canComplete = true; } catch (NotLeaderOrFollowerException e) { // Case c - log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + log.debug("Broker is no longer the leader or follower of topicIdPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); Review Comment: I'd revert this line because all of the other log lines just call it a topicPartition, in spite of technically having the topic ID. That's really fine because it is just an identifier for a topic partition. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -368,6 +368,13 @@ public boolean tryComplete() { "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), sharePartitions.keySet()); } + for (TopicIdPartition topicIdPartition : sharePartitions.keySet()) { + Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + if (!partition.isLeader()) { + log.error("Broker is no longer the leader of topicIdPartition {}", topicIdPartition); + throw new NotLeaderOrFollowerException("Broker is no longer the leader of topicIdPartition: " + topicIdPartition); Review Comment: "topicPartition" :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org