kirktrue commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1924311418


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -456,6 +484,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests()
         return 
fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().build()));
     }
 
+    /**
+     * Simple utility method that returns a {@link 
SubscriptionState.FetchPosition position} for the partition. If
+     * no position exists, an {@link IllegalStateException} is thrown.
+     */
+    private SubscriptionState.FetchPosition 
positionForPartition(TopicPartition partition) {
+        SubscriptionState.FetchPosition position = 
subscriptions.position(partition);
+
+        if (position == null)
+            throw new IllegalStateException("Missing position for fetchable 
partition " + partition);
+
+        return position;
+    }
+
+    /**
+     * Retrieves the node from which to fetch the partition data. If the given
+     * {@link SubscriptionState.FetchPosition position} does not have a current
+     * {@link Metadata.LeaderAndEpoch#leader leader} defined the method will 
return {@link Optional#empty()}.
+     *
+     * @return Three options: 1) {@link Optional#empty()} if the position's 
leader is empty, 2) the
+     * {@link #selectReadReplica(TopicPartition, Node, long) read replica, if 
defined}, or 3) the position's
+     * {@link Metadata.LeaderAndEpoch#leader leader}
+     */
+    private Optional<Node> maybeNodeForPosition(TopicPartition partition,
+                                                
SubscriptionState.FetchPosition position,
+                                                long currentTimeMs) {
+        Optional<Node> leaderOpt = position.currentLeader.leader;
+
+        if (leaderOpt.isEmpty()) {
+            log.debug("Requesting metadata update for partition {} since the 
position {} is missing the current leader node", partition, position);
+            metadata.requestUpdate(false);

Review Comment:
   Since `currentLeader` is immutable, it's only ever updated when a new 
`FetchPosition` is created and set within the `SubscriptionState`. There aren't 
many places where a `FetchPosition` is created with the `currentLeader` set 
explicitly to empty. The two places I see are:
   
   1. `Consumer.seek(TopicPartition, long)` calls into 
`SubscriptionState.seek(TopicPartition, long)` and creates a new 
`FetchPosition` without a leader
   2. `Metadata.currentLeader(TopicPartition)` creates a new `FetchPosition` 
without a leader when it doesn't have any partition metadata for the partition
   
   In the first case, I believe that finding the leader is intentional delayed 
to the call to `Consumer.poll()` where the offsets and leaders will be 
refreshed for any partitions where they're lacking.
   
   I'm curious about this design as well. I'll keep digging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to