chia7712 commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632355536
########## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ########## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { Review Comment: could you please rename it to `testIterator`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ########## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { + String topic = "topic"; + int recordSize = 10; + int partitionSize = 15; + int emptyPartitionIndex = 3; + ConsumerRecords<Integer, String> records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); + Iterator<ConsumerRecord<Integer, String>> iterator = records.iterator(); - Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new LinkedHashMap<>(); + int recordCount = 0; + int partitionCount = 0; + int currentPartition = -1; + + while (iterator.hasNext()) { + ConsumerRecord<Integer, String> record = iterator.next(); + + if (record.partition() == emptyPartitionIndex) { + fail("Partition " + emptyPartitionIndex + " is not empty"); + } + + // Check if we have moved to a new partition + if (currentPartition != record.partition()) { + // Increment the partition count as we have encountered a new partition + partitionCount++; + // Update the current partition to the new partition + currentPartition = record.partition(); + } - String topic = "topic"; - records.put(new TopicPartition(topic, 0), new ArrayList<>()); - ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, - 0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); - ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, - 0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); - records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); - records.put(new TopicPartition(topic, 2), new ArrayList<>()); - - ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records); - Iterator<ConsumerRecord<Integer, String>> iter = consumerRecords.iterator(); - - int c = 0; - for (; iter.hasNext(); c++) { - ConsumerRecord<Integer, String> record = iter.next(); - assertEquals(1, record.partition()); assertEquals(topic, record.topic()); - assertEquals(c, record.offset()); + assertEquals(currentPartition, record.partition()); + assertEquals(recordCount % recordSize, record.offset()); + assertEquals(recordCount % recordSize, record.key()); + assertEquals(String.valueOf(recordCount % recordSize), record.value()); + + recordCount++; } - assertEquals(2, c); + + // Including empty partition + assertEquals(partitionSize, partitionCount + 1); + } + + @Test + public void testRecordsWithNullTopic() { + String nullTopic = null; + ConsumerRecords<Integer, String> consumerRecords = ConsumerRecords.empty(); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); + assertEquals("Topic must be non-null.", exception.getMessage()); + } + + + @Test + public void testRecords() { Review Comment: Could you add test for `records(TopicPartition)`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ########## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { + String topic = "topic"; + int recordSize = 10; + int partitionSize = 15; + int emptyPartitionIndex = 3; + ConsumerRecords<Integer, String> records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); + Iterator<ConsumerRecord<Integer, String>> iterator = records.iterator(); - Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new LinkedHashMap<>(); + int recordCount = 0; + int partitionCount = 0; + int currentPartition = -1; + + while (iterator.hasNext()) { + ConsumerRecord<Integer, String> record = iterator.next(); + + if (record.partition() == emptyPartitionIndex) { + fail("Partition " + emptyPartitionIndex + " is not empty"); + } + + // Check if we have moved to a new partition + if (currentPartition != record.partition()) { + // Increment the partition count as we have encountered a new partition + partitionCount++; + // Update the current partition to the new partition + currentPartition = record.partition(); + } - String topic = "topic"; - records.put(new TopicPartition(topic, 0), new ArrayList<>()); - ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, - 0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); - ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, - 0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); - records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); - records.put(new TopicPartition(topic, 2), new ArrayList<>()); - - ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records); - Iterator<ConsumerRecord<Integer, String>> iter = consumerRecords.iterator(); - - int c = 0; - for (; iter.hasNext(); c++) { - ConsumerRecord<Integer, String> record = iter.next(); - assertEquals(1, record.partition()); assertEquals(topic, record.topic()); - assertEquals(c, record.offset()); + assertEquals(currentPartition, record.partition()); + assertEquals(recordCount % recordSize, record.offset()); + assertEquals(recordCount % recordSize, record.key()); + assertEquals(String.valueOf(recordCount % recordSize), record.value()); + + recordCount++; } - assertEquals(2, c); + + // Including empty partition + assertEquals(partitionSize, partitionCount + 1); + } + + @Test + public void testRecordsWithNullTopic() { + String nullTopic = null; + ConsumerRecords<Integer, String> consumerRecords = ConsumerRecords.empty(); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); + assertEquals("Topic must be non-null.", exception.getMessage()); + } + + + @Test + public void testRecords() { + List<String> topics = Arrays.asList("topic1", "topic2", "topic3", "topic4"); + int recordSize = 3; + int partitionSize = 10; + int emptyPartitionIndex = 6; + int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 1); + + ConsumerRecords<Integer, String> consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + + for (String topic : topics) { + Iterable<ConsumerRecord<Integer, String>> records = consumerRecords.records(topic); + Iterator<ConsumerRecord<Integer, String>> iterator = records.iterator(); + int recordCount = 0; + int partitionCount = 0; + int currentPartition = -1; + + while (iterator.hasNext()) { + ConsumerRecord<Integer, String> record = iterator.next(); + + if (record.partition() == emptyPartitionIndex) { + fail("Partition " + emptyPartitionIndex + " is not empty"); + } + + // Check if we have moved to a new partition + if (currentPartition != record.partition()) { + // Increment the partition count as we have encountered a new partition + partitionCount++; + // Update the current partition to the new partition + currentPartition = record.partition(); + } + + assertEquals(topic, record.topic()); Review Comment: those checks are existent in both test cases. Maybe we can reuse them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org