junrao commented on code in PR #17700: URL: https://github.com/apache/kafka/pull/17700#discussion_r1924211525
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -456,6 +484,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())); } + /** + * Simple utility method that returns a {@link SubscriptionState.FetchPosition position} for the partition. If + * no position exists, an {@link IllegalStateException} is thrown. + */ + private SubscriptionState.FetchPosition positionForPartition(TopicPartition partition) { + SubscriptionState.FetchPosition position = subscriptions.position(partition); + + if (position == null) + throw new IllegalStateException("Missing position for fetchable partition " + partition); + + return position; + } + + /** + * Retrieves the node from which to fetch the partition data. If the given + * {@link SubscriptionState.FetchPosition position} does not have a current + * {@link Metadata.LeaderAndEpoch#leader leader} defined the method will return {@link Optional#empty()}. + * + * @return Three options: 1) {@link Optional#empty()} if the position's leader is empty, 2) the + * {@link #selectReadReplica(TopicPartition, Node, long) read replica, if defined}, or 3) the position's + * {@link Metadata.LeaderAndEpoch#leader leader} + */ + private Optional<Node> maybeNodeForPosition(TopicPartition partition, + SubscriptionState.FetchPosition position, + long currentTimeMs) { + Optional<Node> leaderOpt = position.currentLeader.leader; + + if (leaderOpt.isEmpty()) { + log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); + metadata.requestUpdate(false); Review Comment: Where do we set `position.currentLeader` to empty? Should `metadata.requestUpdate` be called there instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org