OmniaGM commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1867123595
########## clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java: ########## @@ -2564,41 +2562,56 @@ private ProduceRequest createProduceRequest(short version) { .setAcks((short) -1) .setTimeoutMs(123) .setTopicData(new ProduceRequestData.TopicProduceDataCollection(singletonList( - new ProduceRequestData.TopicProduceData() - .setName("topic1") - .setPartitionData(singletonList(new ProduceRequestData.PartitionProduceData() - .setIndex(1) - .setRecords(records)))).iterator())); + createTopicProduceData(version, records, new TopicIdPartition(Uuid.ZERO_UUID, 1, "topic1")) + ).iterator())); return new ProduceRequest.Builder(version, version, data).build(version); } + byte magic = version == 2 ? RecordBatch.MAGIC_VALUE_V1 : RecordBatch.MAGIC_VALUE_V2; MemoryRecords records = MemoryRecords.withRecords(magic, Compression.NONE, new SimpleRecord("woot".getBytes())); + TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "test"); return ProduceRequest.forMagic(magic, new ProduceRequestData() - .setTopicData(new ProduceRequestData.TopicProduceDataCollection(singletonList( - new ProduceRequestData.TopicProduceData() - .setName("test") - .setPartitionData(singletonList(new ProduceRequestData.PartitionProduceData() - .setIndex(0) - .setRecords(records)))).iterator())) + .setTopicData(new ProduceRequestData.TopicProduceDataCollection( + singletonList(createTopicProduceData(version, records, topicIdPartition)).iterator() + )) .setAcks((short) 1) .setTimeoutMs(5000) .setTransactionalId(version >= 3 ? "transactionalId" : null)) .build(version); } + private static ProduceRequestData.TopicProduceData createTopicProduceData(short version, MemoryRecords records, TopicIdPartition tp) { + ProduceRequestData.TopicProduceData topicProduceData = new ProduceRequestData.TopicProduceData() + .setPartitionData(singletonList(new ProduceRequestData.PartitionProduceData() + .setIndex(tp.partition()) + .setRecords(records))); + if (version >= 12) { + topicProduceData.setTopicId(tp.topicId()); + } else { + topicProduceData.setName(tp.topic()); + } + return topicProduceData; + } + + private static TopicIdPartition createTopicIdPartition(Uuid topicId, int partitionIndex) { + return new TopicIdPartition(topicId, partitionIndex, ""); Review Comment: If I recall correctly it was because topic name didn't matter as the new version wouldn't relay on topic name but now that I am looking at the code this empty string is confusing. I'll update it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org