junrao commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1526587744
##########
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##########
@@ -82,6 +82,13 @@ class ListOffsetsIntegrationTest extends
KafkaServerTestHarness {
def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
produceMessagesInSeparateBatch()
verifyListOffsets()
+
+ // test LogAppendTime case
+ val props: Properties = new Properties()
+ props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
"LogAppendTime")
+ createTopicWithConfig(topicNameWithCustomConfigs, props)
+ produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
+ verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
Review Comment:
It's fine to add this test, but it doesn't require the fix in LogValidator
in this PR, right? If each batch has only a single record, the offset for
maxTimestamp is always the same between logAppend and create time.
It seems that we need to add a test case that includes multiple records in a
non-compressed batch.
##########
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##########
@@ -209,7 +209,7 @@ private static FilterResult filterTo(TopicPartition
partition, Iterable<MutableR
partition, batch.lastOffset(),
maxRecordBatchSize, filteredBatchSize);
MemoryRecordsBuilder.RecordsInfo info = builder.info();
-
filterResult.updateRetainedBatchMetadata(info.maxTimestamp,
info.shallowOffsetOfMaxTimestamp,
+
filterResult.updateRetainedBatchMetadata(info.maxTimestamp,
info.recordOffsetOfMaxTimestamp,
Review Comment:
There are still a few places with `shallowOffsetOfMaxTimestamp`. We need to
fix them all.
`FilterResult` and `updateRetainedBatchMetadata()` in `MemoryRecords`.
`append()` in `LocalLog`
`append()` in `LogSegment`
##########
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##########
@@ -173,9 +173,9 @@ class LogValidatorTest {
assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should
be $now")
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should
not have been changed")
- // we index from last offset in version 2 instead of base offset
- val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2)
2 else 0
- assertEquals(expectedMaxTimestampOffset,
validatedResults.shallowOffsetOfMaxTimestampMs,
+ // If it's LOG_APPEND_TIME, the offset will be the first offset of the
record
Review Comment:
the first offset of the record => the offset of the first record
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]