jeffkbkim commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1915199671


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3437,6 +3445,181 @@ public void testPollWithRedundantCreateFetchRequests() {
 
     }
 
+    @Test
+    public void testFetchRequestWithBufferedPartitions() {
+        buildFetcher();
+
+        int numNodes = 2;
+        Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2, tp3);
+        assignFromUser(partitions, numNodes);
+
+        // Seek each partition so that it becomes eligible to fetch.
+        partitions.forEach(tp -> subscriptions.seek(tp, 0));
+
+        List<Node> nodes = partitions.stream()
+            .map(tp -> metadata.fetch().leaderFor(tp))
+            .filter(Objects::nonNull)
+            .distinct()
+            .collect(Collectors.toList());
+        assertEquals(numNodes, nodes.size(), "The number of nodes in the 
cluster is incorrect");
+
+        Node node0 = nodes.get(0);
+        Node node1 = nodes.get(1);
+        List<TopicPartition> node0Partitions = partitionsForNode(node0, 
partitions);
+        List<TopicPartition> node1Partitions = partitionsForNode(node1, 
partitions);
+
+        // sendFetches() call #1.
+        assertEquals(
+            2,
+            sendFetches(),
+            "The first fetch should issue requests to node 0 or node 1 since 
neither has buffered data"
+        );
+        prepareFetchResponses(node0, node0Partitions, buildRecords(0, 10, 1));
+        prepareFetchResponses(node1, node1Partitions, buildRecords(0, 10, 1));
+        networkClientDelegate.poll(time.timer(0));
+        assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+        assertCollected(node0Partitions.remove(0), partitions);
+        assertEquals(3, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // sendFetches() call #2.
+        assertEquals(
+            0,
+            sendFetches(),
+            "The second fetch shouldn't issue requests to either node 0 or 
node 1 since they both have buffered data"
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertCollected(node1Partitions.remove(0), partitions);
+        assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // sendFetches() call #3.
+        assertEquals(
+            0,
+            sendFetches(),
+            "The third fetch shouldn't issue requests to either node 0 or node 
1 since they both have buffered data"
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertCollected(node0Partitions.remove(0), partitions);
+        assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // Node 0's partitions have all been collected, so validate that and 
then reset the list of partitions
+        // from which to fetch data so the next pass should request can fetch 
more data.
+        assertTrue(node0Partitions.isEmpty());
+        node0Partitions = partitionsForNode(node0, partitions);
+
+        // sendFetches() call #4.
+        assertEquals(
+            1,
+            sendFetches(),
+            "The fourth fetch should issue a request to node 0 since its 
buffered data was collected"
+        );
+        prepareFetchResponses(node0, node0Partitions, buildRecords(10, 10, 1));
+        networkClientDelegate.poll(time.timer(0));
+        assertCollected(node1Partitions.remove(0), partitions);
+        assertEquals(2, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        // Node 1's partitions have likewise all been collected, so validate 
that and reset.
+        assertTrue(node1Partitions.isEmpty());
+        node1Partitions = partitionsForNode(node1, partitions);
+
+        // sendFetches() call #5.
+        assertEquals(
+            1,
+            sendFetches(),
+            "The fifth fetch should issue a request to node 1 since its 
buffered data was collected"
+        );
+        prepareFetchResponses(node1, node1Partitions, buildRecords(10, 10, 1));
+        networkClientDelegate.poll(time.timer(0));
+        assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        assertEquals(
+            partitions,
+            fetchRecords().keySet(),
+            "Records from all partitions should have been collected"
+        );
+
+        assertEquals(
+            0,
+            fetcher.fetchBuffer.bufferedPartitions().size(),
+            "There was still data remaining in the fetch buffer"
+        );
+
+        // sendFetches() call #6.
+        assertEquals(
+            2,
+            sendFetches(),
+            "The sixth fetch should issue a request to nodes 0 and 1 since its 
buffered data was collected"
+        );
+        prepareFetchResponses(node0, node0Partitions, buildRecords(20, 10, 1));
+        prepareFetchResponses(node1, node1Partitions, buildRecords(20, 10, 1));
+        networkClientDelegate.poll(time.timer(0));
+        assertEquals(4, fetcher.fetchBuffer.bufferedPartitions().size());
+
+        assertEquals(
+            partitions,
+            fetchRecords().keySet(),
+            "Records from all partitions should have been collected"
+        );
+
+        assertEquals(
+            0,
+            fetcher.fetchBuffer.bufferedPartitions().size(),
+            "There was still data remaining in the fetch buffer"
+        );
+    }
+
+    private List<TopicPartition> partitionsForNode(Node node, 
Set<TopicPartition> partitions) {
+        int nodesPerPartition = 2;

Review Comment:
   nit: this should be partitionsPerNode right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -408,7 +388,21 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests()
         long currentTimeMs = time.milliseconds();
         Map<String, Uuid> topicIds = metadata.topicIds();
 
-        for (TopicPartition partition : fetchablePartitions()) {
+        // This is the set of partitions that have buffered data
+        Set<TopicPartition> buffered = 
Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());
+
+        // This is the set of partitions that do not have buffered data
+        Set<TopicPartition> unbuffered = 
Set.copyOf(subscriptions.fetchablePartitions(tp -> !buffered.contains(tp)));
+
+        if (unbuffered.isEmpty()) {
+            // If there are no partitions that don't already have data locally 
buffered, there's no need to issue
+            // any fetch requests at the present time.
+            return Collections.emptyMap();
+        }
+
+        Set<Integer> nodesWithBufferedPartitions = 
nodesWithBufferedPartitions(buffered, currentTimeMs);

Review Comment:
   Have we seen an impact on consumer CPU?
   
   also, i was wondering if getting nodes for buffered partitions was something 
we can pre-process while updating fetchBuffer. but not sure that would actually 
improve performance. I guess it would still help since we process it once



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to