junrao commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1924510229


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -408,22 +407,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests()
         long currentTimeMs = time.milliseconds();
         Map<String, Uuid> topicIds = metadata.topicIds();
 
-        for (TopicPartition partition : fetchablePartitions()) {
-            SubscriptionState.FetchPosition position = 
subscriptions.position(partition);
+        // This is the set of partitions that have buffered data
+        Set<TopicPartition> buffered = 
Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());
 
-            if (position == null)
-                throw new IllegalStateException("Missing position for 
fetchable partition " + partition);
+        // This is the set of partitions that do not have buffered data
+        Set<TopicPartition> unbuffered = fetchablePartitions(buffered);
 
-            Optional<Node> leaderOpt = position.currentLeader.leader;
+        if (unbuffered.isEmpty()) {
+            // If there are no partitions that don't already have data locally 
buffered, there's no need to issue
+            // any fetch requests at the present time.
+            return Collections.emptyMap();
+        }
 
-            if (leaderOpt.isEmpty()) {
-                log.debug("Requesting metadata update for partition {} since 
the position {} is missing the current leader node", partition, position);
-                metadata.requestUpdate(false);
+        Set<Integer> bufferedNodes = new HashSet<>();
+
+        for (TopicPartition partition : buffered) {
+            // It's possible that at the time of the fetcher creating new 
fetch requests, a partition with buffered
+            // data from a *previous* request is no longer assigned. So before 
attempting to retrieve the node
+            // information, check that the partition is still assigned as 
calling  SubscriptionState.position() on an
+            // unassigned partition will throw an IllegalStateException.
+            //
+            // Note: this check is not needed for the unbuffered partitions as 
the logic in
+            // SubscriptionState.fetchablePartitions() only includes 
partitions currently assigned.
+            if (!subscriptions.isAssigned(partition))

Review Comment:
   @kirktrue : Thanks for the explanation. In 
[FetchCollector.fetchRecords()](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L156-L158),
 if a partition is unassigned, we will call `nextInLineFetch.drain()` at the 
end to drain the fetched data for that partition. So, in the common case, the 
buffered data for unassigned partition will be drained before the partition is 
assigned back again. 
   
   In the rare case, in theory, the following seems possible (1) partition1 is 
assigned to client1; (2) a CompletedFetch for partition1 is buffered in the 
client1; (3) partition1 is reassigned to client2; (4) client2 consumes the same 
data buffered in step (2); (5) partition1 is reassigned back to client2; (6) 
client1 calls poll() and consumes the data buffered in step 2, causing 
duplicated data to be returned to the client. Since this is an existing and 
rare issue, we can investigate in a separate jira.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to