showuon commented on code in PR #12066: URL: https://github.com/apache/kafka/pull/12066#discussion_r854733717
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +98,54 @@ public void teardown() { this.metrics.close(); } + @Test + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + assertEquals(2, batches1.size()); + verifyTopicPartitionInBatches(batches1, tp1, tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained + // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batchss2, tp2, tp4); Review Comment: I think we can add another line to make sure it'll pick tp1, tp3 in next run. That is: ```java // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); verifyTopicPartitionInBatches(batchss2, tp2, tp4); // make sure in next run, the drain index will start from the beginning Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); verifyTopicPartitionInBatches(batches3, tp1, tp3); ``` ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +98,54 @@ public void teardown() { this.metrics.close(); } + @Test + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + assertEquals(2, batches1.size()); + verifyTopicPartitionInBatches(batches1, tp1, tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained + // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); Review Comment: typo: `batchss2` -> `batches2` ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -29,6 +29,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; + Review Comment: Please remove this line. Thanks. ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +98,54 @@ public void teardown() { this.metrics.close(); } + @Test + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + assertEquals(2, batches1.size()); + verifyTopicPartitionInBatches(batches1, tp1, tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained + // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batchss2, tp2, tp4); + } + + private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> batches, TopicPartition... tp) { + assertEquals(tp.length,batches.size()); + List<TopicPartition> topicPartitionsInBatch = new ArrayList<TopicPartition>(); + for (Map.Entry<Integer, List<ProducerBatch>> entry : batches.entrySet()) { + List<ProducerBatch> batchList = entry.getValue(); + assertEquals(batchList.size(), 1); Review Comment: The method semantic is `assertEquals(expected, actual)` So, we should do: `assertEquals(1, batchList.size());`, right? ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +98,54 @@ public void teardown() { this.metrics.close(); } + @Test + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + assertEquals(2, batches1.size()); + verifyTopicPartitionInBatches(batches1, tp1, tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained + // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batchss2, tp2, tp4); + } + + private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> batches, TopicPartition... tp) { + assertEquals(tp.length,batches.size()); + List<TopicPartition> topicPartitionsInBatch = new ArrayList<TopicPartition>(); + for (Map.Entry<Integer, List<ProducerBatch>> entry : batches.entrySet()) { + List<ProducerBatch> batchList = entry.getValue(); + assertEquals(batchList.size(), 1); + topicPartitionsInBatch.add(batchList.get(0).topicPartition); + } + + for (int i = 0 ; i < tp.length ; i++){ + assertEquals(topicPartitionsInBatch.get(i), tp[i]); Review Comment: same here: `assertEquals(tp[i], topicPartitionsInBatch.get(i));` ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +98,54 @@ public void teardown() { this.metrics.close(); } + @Test + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + assertEquals(2, batches1.size()); + verifyTopicPartitionInBatches(batches1, tp1, tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained + // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batchss2, tp2, tp4); + } + + private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> batches, TopicPartition... tp) { + assertEquals(tp.length,batches.size()); Review Comment: nit: add space after comma (,) ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -559,13 +560,12 @@ private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, i int size = 0; List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<ProducerBatch> ready = new ArrayList<>(); - /* to make starvation less likely this loop doesn't start at 0 */ + /* to make starvation less likely each node has it's own drainIndex */ + int drainIndex = getDrainIndex(node.idString()); int start = drainIndex = drainIndex % parts.size(); do { PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); - this.drainIndex = (this.drainIndex + 1) % parts.size(); - // Only proceed if the partition has no in-flight batches. if (isMuted(tp)) continue; Review Comment: Have a 2nd look, I found this change is not right. If the tp is muted or other reason, it `continued`, and we'll have a infinite loop. Please fix it and add a test for this case. Thanks. ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +98,54 @@ public void teardown() { this.metrics.close(); } + @Test + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + assertEquals(2, batches1.size()); Review Comment: We don't need to verify the size because we'll verify it in `verifyTopicPartitionInBatches`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org