chia7712 commented on code in PR #18889:
URL: https://github.com/apache/kafka/pull/18889#discussion_r1957459755


##########
core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala:
##########
@@ -18,16 +18,43 @@ package kafka.api
 
 import kafka.utils.TestInfoUtils
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.compress.Compression
+import org.apache.kafka.common.record.{AbstractRecords, CompressionType, 
MemoryRecords, RecordBatch, RecordVersion, SimpleRecord, TimestampType}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, 
assertThrows}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
+import java.nio.ByteBuffer
 import java.util
 import java.util.{Collections, Optional}
 import scala.jdk.CollectionConverters._
 
 class ConsumerWithLegacyMessageFormatIntegrationTest extends 
AbstractConsumerTest {
 
+  private def appendLegacyRecords(numRecords: Int, tp: TopicPartition, 
startingTimestamp: Long, brokerId: Int): Unit = {
+    val records = (0 until numRecords).map { i =>
+      new SimpleRecord(startingTimestamp + i, s"key $i".getBytes, s"value 
$i".getBytes)
+    }
+    val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(RecordBatch.MAGIC_VALUE_V1,
 CompressionType.NONE, records.asJava))
+    val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, 
Compression.of(CompressionType.NONE).build,

Review Comment:
   Could you please add test for v0 also? in v0 we don't write the timestamp, 
so searching record by timestamp should return null.



##########
core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala:
##########
@@ -80,12 +114,12 @@ class ConsumerWithLegacyMessageFormatIntegrationTest 
extends AbstractConsumerTes
     val timestampTopic2P0 = timestampOffsets.get(new TopicPartition(topic2, 0))
     assertEquals(40, timestampTopic2P0.offset)
     assertEquals(40, timestampTopic2P0.timestamp)
-    assertEquals(Optional.of(0), timestampTopic2P0.leaderEpoch)
+    assertEquals(Optional.empty, timestampTopic2P0.leaderEpoch)

Review Comment:
   yes, we don't set the leader epoch for the old message format.



##########
core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala:
##########
@@ -51,8 +78,15 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends 
AbstractConsumerTes
     for (topic <- List(topic1, topic2, topic3)) {
       for (part <- 0 until numParts) {
         val tp = new TopicPartition(topic, part)
-        // In sendRecords(), each message will have key, value and timestamp 
equal to the sequence number.
-        sendRecords(producer, numRecords = 100, tp, startingTimestamp = 0)
+        // In sendRecords() and sendLegacyRecords(), each message will have 
key, value and timestamp equal to the sequence number.
+        if (topic == topic2) {
+          // append legacy records to topic2
+          appendLegacyRecords(100, tp, 0, part)
+        } else {
+          println("sendRecords")

Review Comment:
   please remove this line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to