showuon commented on code in PR #12066:
URL: https://github.com/apache/kafka/pull/12066#discussion_r854844189


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +98,66 @@ public void teardown() {
         this.metrics.close();
     }
 
+    @Test
+    public void testDrainBatchesStarve() throws Exception {
+        // test case: node1(tp1,tp2) , node2(tp3,tp4)
+        // add tp-4
+        int partition4 = 3;
+        TopicPartition tp4 = new TopicPartition(topic, partition4);
+        PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+        long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3, part4),
+                Collections.emptySet(), Collections.emptySet());
+
+        //  initial data
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the 
max request size is full after the first batch drained
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batches1, tp1, tp3);
+
+        // add record for tp1, tp3
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the 
max request size is full after the first batch drained
+        // The drain index should start from next topic partition, that is, 
node1 => tp2, node2 => tp4
+        Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batches2, tp2, tp4);
+
+        // make sure in next run, the drain index will start from the beginning
+        Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batches3, tp1, tp3);
+
+        // test the contine case, mute the tp4 and drain batches from 2nodes: 
node1 => tp2, node2 => tp3 (because tp4 is muted)
+        // add record for tp2, tp3, tp4  mute the tp4
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.mutePartition(tp4);
+        Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);

Review Comment:
   add the above comment here, and change to
   `drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 is 
muted)`



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +98,54 @@ public void teardown() {
         this.metrics.close();
     }
 
+    @Test
+    public void testDrainBatchesStarve() throws Exception {
+        // test case: node1(tp1,tp2) , node2(tp3,tp4)
+        // add tp-4
+        int partition4 = 3;
+        TopicPartition tp4 = new TopicPartition(topic, partition4);
+        PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+        long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3, part4),
+                Collections.emptySet(), Collections.emptySet());
+
+        //  initial data
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the 
max request size is full after the first batch drained
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        assertEquals(2, batches1.size());
+        verifyTopicPartitionInBatches(batches1, tp1, tp3);
+
+        // add record for tp1, tp3
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the 
max request size is full after the first batch drained
+        // The drain index should start from next topic partition, that is, 
node1 => tp2, node2 => tp4
+        Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batchss2, tp2, tp4);
+    }
+
+    private void verifyTopicPartitionInBatches(Map<Integer, 
List<ProducerBatch>> batches, TopicPartition... tp) {
+        assertEquals(tp.length,batches.size());

Review Comment:
   need space after comma (,)



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +98,66 @@ public void teardown() {
         this.metrics.close();
     }
 
+    @Test
+    public void testDrainBatchesStarve() throws Exception {
+        // test case: node1(tp1,tp2) , node2(tp3,tp4)
+        // add tp-4
+        int partition4 = 3;
+        TopicPartition tp4 = new TopicPartition(topic, partition4);
+        PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+        long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3, part4),
+                Collections.emptySet(), Collections.emptySet());
+
+        //  initial data
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the 
max request size is full after the first batch drained
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batches1, tp1, tp3);
+
+        // add record for tp1, tp3
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the 
max request size is full after the first batch drained
+        // The drain index should start from next topic partition, that is, 
node1 => tp2, node2 => tp4
+        Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batches2, tp2, tp4);
+
+        // make sure in next run, the drain index will start from the beginning
+        Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batches3, tp1, tp3);
+
+        // test the contine case, mute the tp4 and drain batches from 2nodes: 
node1 => tp2, node2 => tp3 (because tp4 is muted)
+        // add record for tp2, tp3, tp4  mute the tp4

Review Comment:
   remove the 1st comment: `// test the contine case,...`



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -559,13 +559,14 @@ private List<ProducerBatch> 
drainBatchesForOneNode(Cluster cluster, Node node, i
         int size = 0;
         List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
         List<ProducerBatch> ready = new ArrayList<>();
-        /* to make starvation less likely this loop doesn't start at 0 */
+        /* to make starvation less likely each node has it's own drainIndex */
+        int drainIndex = getDrainIndex(node.idString());
         int start = drainIndex = drainIndex % parts.size();
         do {
             PartitionInfo part = parts.get(drainIndex);
             TopicPartition tp = new TopicPartition(part.topic(), 
part.partition());
-            this.drainIndex = (this.drainIndex + 1) % parts.size();
-
+            updateDrainIndex(node.idString(), drainIndex);

Review Comment:
   Thanks for the explanation. Make sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to