junrao commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r2047838466
########## core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala: ########## @@ -85,4 +85,80 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness { assertEquals(topic, producer.send(new ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get.topic()) } + /** + * Tests that Producer produce to new topic id after recreation. + * + * Producer will attempt to send messages to the partition specified in each record, and should + * succeed as long as the metadata has been updated with new topic id. + */ + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testSendWithRecreatedTopic(quorum: String): Unit = { + val numRecords = 10 + val topic = "topic" + createTopic(topic) + val admin = createAdminClient() + val topicId = topicMetadata(admin, topic).topicId() + val producer = createProducer() + + (1 to numRecords).foreach { i => + val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get + assertEquals(topic, resp.topic()) + } + // Start topic deletion + deleteTopic(topic, listenerName) + + // Verify that the topic is deleted when no metadata request comes in + TestUtils.verifyTopicDeletion(topic, 2, brokers) + createTopic(topic) + assertNotEquals(topicId, topicMetadata(admin, topic).topicId()) + + // Producer should be able to send messages even after topic gets recreated + val recordMetadata: RecordMetadata = producer.send(new ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get + assertEquals(topic, recordMetadata.topic()) + assertEquals(0, recordMetadata.offset()) + } + + /** + * Tests that Producer produce to topic during reassignment where topic metadata change on broker side. + * + * Producer will attempt to send messages to the partition specified in each record, and should + * succeed as long as the metadata cache on the leader includes the partition topic id. + */ + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testSendWithTopicReassignmentIsMidWay(quorum: String): Unit = { + val numRecords = 10 + val topic = "topic" + val partition0: TopicPartition = new TopicPartition(topic, 0) + val admin: Admin = createAdminClient() + + // Create topic with leader as 0 for the 1 partition. + createTopicWithAssignment(topic, Map(0 -> Seq(0))) + TestUtils.assertLeader(admin, partition0, 0) + + val topicDetails = topicMetadata(admin, topic) + val producer = createProducer() + + (1 to numRecords).foreach { i => + val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get + assertEquals(topic, resp.topic()) + } + + val reassignment = Map( + partition0 -> Optional.of(new NewPartitionReassignment(util.Arrays.asList(2))), + ) + + // Change assignment of one of the replicas from 0 to 2. Leadership moves be . Review Comment: > Leadership moves be . incomplete sentence. ########## core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala: ########## @@ -85,4 +85,80 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness { assertEquals(topic, producer.send(new ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get.topic()) } + /** + * Tests that Producer produce to new topic id after recreation. + * + * Producer will attempt to send messages to the partition specified in each record, and should + * succeed as long as the metadata has been updated with new topic id. + */ + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testSendWithRecreatedTopic(quorum: String): Unit = { + val numRecords = 10 + val topic = "topic" + createTopic(topic) + val admin = createAdminClient() + val topicId = topicMetadata(admin, topic).topicId() + val producer = createProducer() + + (1 to numRecords).foreach { i => + val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get + assertEquals(topic, resp.topic()) + } + // Start topic deletion + deleteTopic(topic, listenerName) + + // Verify that the topic is deleted when no metadata request comes in + TestUtils.verifyTopicDeletion(topic, 2, brokers) + createTopic(topic) + assertNotEquals(topicId, topicMetadata(admin, topic).topicId()) + + // Producer should be able to send messages even after topic gets recreated + val recordMetadata: RecordMetadata = producer.send(new ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get + assertEquals(topic, recordMetadata.topic()) + assertEquals(0, recordMetadata.offset()) + } + + /** + * Tests that Producer produce to topic during reassignment where topic metadata change on broker side. + * + * Producer will attempt to send messages to the partition specified in each record, and should + * succeed as long as the metadata cache on the leader includes the partition topic id. + */ + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testSendWithTopicReassignmentIsMidWay(quorum: String): Unit = { + val numRecords = 10 + val topic = "topic" + val partition0: TopicPartition = new TopicPartition(topic, 0) + val admin: Admin = createAdminClient() + + // Create topic with leader as 0 for the 1 partition. + createTopicWithAssignment(topic, Map(0 -> Seq(0))) + TestUtils.assertLeader(admin, partition0, 0) + + val topicDetails = topicMetadata(admin, topic) + val producer = createProducer() + + (1 to numRecords).foreach { i => + val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get + assertEquals(topic, resp.topic()) + } + + val reassignment = Map( + partition0 -> Optional.of(new NewPartitionReassignment(util.Arrays.asList(2))), Review Comment: Could we reassign to broker 1? Then, we could just use 2 brokers instead of 3 for the test. ########## core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala: ########## @@ -133,11 +134,12 @@ class ProduceRequestTest extends BaseRequestTest { } val records = createRecords(RecordBatch.MAGIC_VALUE_V2, recordTimestamp, Compression.gzip().build()) - val topicPartition = new TopicPartition("topic", partition) + val topicPartition = new TopicIdPartition(topicDescription.topicId(), partition, "topic") val produceResponse = sendProduceRequest(leader, ProduceRequest.builder(new ProduceRequestData() .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( new ProduceRequestData.TopicProduceData() .setName(topicPartition.topic()) + .setTopicId(topicPartition.topicId()) Review Comment: We could remove `setName`, right? Ditto below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org