chia7712 commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1527041931
##########
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##########
@@ -209,7 +209,7 @@ private static FilterResult filterTo(TopicPartition
partition, Iterable<MutableR
partition, batch.lastOffset(),
maxRecordBatchSize, filteredBatchSize);
MemoryRecordsBuilder.RecordsInfo info = builder.info();
-
filterResult.updateRetainedBatchMetadata(info.maxTimestamp,
info.shallowOffsetOfMaxTimestamp,
+
filterResult.updateRetainedBatchMetadata(info.maxTimestamp,
info.recordOffsetOfMaxTimestamp,
Review Comment:
It seems `OffsetOfMaxTimestamp` is simpler and readable. I like the naming
relation between `info.maxTimestamp` and `info.offsetOfmaxTimestamp` :)
##########
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##########
@@ -82,6 +82,13 @@ class ListOffsetsIntegrationTest extends
KafkaServerTestHarness {
def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
produceMessagesInSeparateBatch()
verifyListOffsets()
+
+ // test LogAppendTime case
+ val props: Properties = new Properties()
+ props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
"LogAppendTime")
+ createTopicWithConfig(topicNameWithCustomConfigs, props)
+ produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
+ verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
Review Comment:
it would be nice to add comments for this case.
##########
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##########
@@ -77,11 +77,34 @@ class ListOffsetsIntegrationTest extends
KafkaServerTestHarness {
verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = {
+ produceMessagesInOneBatch()
+ verifyListOffsets()
+
+ // test LogAppendTime case
+ val props: Properties = new Properties()
+ props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
"LogAppendTime")
+ createTopicWithConfig(topicNameWithCustomConfigs, props)
+ produceMessagesInOneBatch(topic=topicNameWithCustomConfigs)
+ // In LogAppendTime's case, the maxTimestampOffset should be the first
record of the batch.
+ // So in this one batch test, it'll be the first offset which is 0
+ verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
+ }
+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
Review Comment:
Should we rename it to `testThreeNonCompressedRecordsInSeparateBatch`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]