Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon merged PR #15542: URL: https://github.com/apache/kafka/pull/15542 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
johnnychhsu commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1526027063 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,236 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false + val mockTime: Time = new MockTime(1) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages() +createTopicWithConfig(topicName, new Properties()) adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } + override def brokerTime(brokerId: Int): Time = mockTime + @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testLatestOffset(quorum: String): Unit = { -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) -assertEquals(3, latestOffset.offset()) + def testThreeRecordsInSeparateBatch(quorum: String): Unit = { +produceMessagesInSeparateBatch() +verifyListOffsets() Review Comment: thanks @showuon! I will address this in #15476 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on PR #15474: URL: https://github.com/apache/kafka/pull/15474#issuecomment-1998834117 Thanks all for the review! I've backported to v3.7. For v3.6, there are more codes diff, I'd like to run CI first before push the change. PR: https://github.com/apache/kafka/pull/15542 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on PR #15474: URL: https://github.com/apache/kafka/pull/15474#issuecomment-1998576991 Thanks for all reviews and @showuon. This is not only a important fix to kafka but also a great experience to me :) @showuon Could you please backport this fix? thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 merged PR #15474: URL: https://github.com/apache/kafka/pull/15474 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on PR #15474: URL: https://github.com/apache/kafka/pull/15474#issuecomment-1998564199 the failed tests pass on my local. ```sh ./gradlew cleanTest core:test --tests LogDirFailureTest --tests ReplicaManagerTest connect:mirror:test --tests MirrorConnectorsIntegrationTransactionsTest streams:test --tests HighAvailabilityTaskAssignorIntegrationTest ``` the links for those flaky are shown below: https://issues.apache.org/jira/browse/KAFKA-16225 https://issues.apache.org/jira/browse/KAFKA-16376 https://issues.apache.org/jira/browse/KAFKA-16377 https://issues.apache.org/jira/browse/KAFKA-15945 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1524067155 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,236 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false + val mockTime: Time = new MockTime(1) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages() +createTopicWithConfig(topicName, new Properties()) adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } + override def brokerTime(brokerId: Int): Time = mockTime + @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testLatestOffset(quorum: String): Unit = { -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) -assertEquals(3, latestOffset.offset()) + def testThreeRecordsInSeparateBatch(quorum: String): Unit = { +produceMessagesInSeparateBatch() +verifyListOffsets() Review Comment: It's because the fix about "returning the first offset when multi-records having the same maxTimestamp" for "non-compressed" records are in this PR: https://github.com/apache/kafka/pull/15476 . we should add the logAppendTime test there. :) cc @johnnychhsu ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,236 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false + val mockTime: Time = new MockTime(1) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1524067155 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,236 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false + val mockTime: Time = new MockTime(1) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages() +createTopicWithConfig(topicName, new Properties()) adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } + override def brokerTime(brokerId: Int): Time = mockTime + @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testLatestOffset(quorum: String): Unit = { -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) -assertEquals(3, latestOffset.offset()) + def testThreeRecordsInSeparateBatch(quorum: String): Unit = { +produceMessagesInSeparateBatch() +verifyListOffsets() Review Comment: It's because the fix about "returning the first offset when multi-records having the same maxTimestamp" for "non-compressed" records are in this PR: https://github.com/apache/kafka/pull/15476 . We should add tests there. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
junrao commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1523649729 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,236 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false + val mockTime: Time = new MockTime(1) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages() +createTopicWithConfig(topicName, new Properties()) adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } + override def brokerTime(brokerId: Int): Time = mockTime + @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testLatestOffset(quorum: String): Unit = { -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) -assertEquals(3, latestOffset.offset()) + def testThreeRecordsInSeparateBatch(quorum: String): Unit = { +produceMessagesInSeparateBatch() +verifyListOffsets() Review Comment: Why did we exclude the LogAppendTime test in this method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -242,34 +242,23 @@ public MemoryRecords build() { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. + * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. * - * If the log append time is used, the offset will be the last offset unless no compression is used and - * the message format version is 0 or 1, in which case, it will be the first offset. + * If the log append time is used, the offset will be the first offset of the record. * - * If create time is used, the offset will be the last offset unless no compression is used and the message - * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. + * If create time is used, the offset will always be the offset of the record with the max timestamp. + * + * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. * * @return The max timestamp and its offset */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = baseOffset; -return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp); -} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { -return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); +return new RecordsInfo(logAppendTime, baseOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +// If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: Currently, when in `assignOffsetsNonCompressed` or `validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` for maxTimestamp and offset if it's `MAGIC_VALUE_V0` [here](https://github.com/apache/kafka/pull/15474/files#diff-74aba980d40ab3c5c6fd66d7a27ffb4515181130e475b589834538d05aa408b9L263-L264). But in the case of re-build the records, we make it as `[-1, lastOffset]`, which is inconsistent. Fixing it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -242,34 +242,23 @@ public MemoryRecords build() { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. + * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. * - * If the log append time is used, the offset will be the last offset unless no compression is used and - * the message format version is 0 or 1, in which case, it will be the first offset. + * If the log append time is used, the offset will be the first offset of the record. * - * If create time is used, the offset will be the last offset unless no compression is used and the message - * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. + * If create time is used, the offset will always be the offset of the record with the max timestamp. + * + * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. * * @return The max timestamp and its offset */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = baseOffset; -return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp); -} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { -return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); +return new RecordsInfo(logAppendTime, baseOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +// If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: Currently, when in `assignOffsetsNonCompressed` or `validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` for maxTimestamp and offset if it's `MAGIC_VALUE_V0` [here](https://github.com/apache/kafka/pull/15474/files#diff-74aba980d40ab3c5c6fd66d7a27ffb4515181130e475b589834538d05aa408b9L263-L264). But in the case of re-build the records, we make the it as `[-1, lastOffset]`, which is inconsistent. Fixing it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -242,34 +242,23 @@ public MemoryRecords build() { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. + * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. * - * If the log append time is used, the offset will be the last offset unless no compression is used and - * the message format version is 0 or 1, in which case, it will be the first offset. + * If the log append time is used, the offset will be the first offset of the record. * - * If create time is used, the offset will be the last offset unless no compression is used and the message - * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. + * If create time is used, the offset will always be the offset of the record with the max timestamp. + * + * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. * * @return The max timestamp and its offset */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = baseOffset; -return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp); -} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { -return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); +return new RecordsInfo(logAppendTime, baseOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +// If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: Currently, when in `assignOffsetsNonCompressed` or `validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` for maxTimestamp and offset if it's `MAGIC_VALUE_V0`. But in the case of re-build the records, we make the it as `[-1, lastOffset]`, which is inconsistent. Fixing it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522294617 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,230 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.utils.Utils import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages() +createTopicWithConfig(topicName, new Properties()) adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testLatestOffset(quorum: String): Unit = { -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) -assertEquals(3, latestOffset.offset()) + def testThreeRecordsInSeparateBatch(quorum: String): Unit = { +produceMessagesInSeparateBatch() +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this separate batch test, it'll be the last offset 2 +verifyListOffsets(topic = topicNameWithCustomConfigs, 2) Review Comment: Good point! Let me improve it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
junrao commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522164475 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,230 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.utils.Utils import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages() +createTopicWithConfig(topicName, new Properties()) adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testLatestOffset(quorum: String): Unit = { -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) -assertEquals(3, latestOffset.offset()) + def testThreeRecordsInSeparateBatch(quorum: String): Unit = { +produceMessagesInSeparateBatch() +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this separate batch test, it'll be the last offset 2 +verifyListOffsets(topic = topicNameWithCustomConfigs, 2) Review Comment: Do we guarantee that the server time has advanced after appending each batch? Ditto for `testThreeRecordsInSeparateBatchWithMessageConversion` ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +53,24 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before() { cluster.config().serverProperties().put("auto.create.topics.enable", false); cluster.config().serverProperties().put("offsets.topic.replication.factor", "1"); cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount)); +} +public void setUp() { Review Comment: Ok. Could we make this setUp() private? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522051894 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -243,33 +243,20 @@ public MemoryRecords build() { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. * - * If the log append time is used, the offset will be the last offset unless no compression is used and Review Comment: Could you please update `OffsetSpec` also? https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java#L65 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1521398482 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -243,33 +243,20 @@ public MemoryRecords build() { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. * - * If the log append time is used, the offset will be the last offset unless no compression is used and Review Comment: Updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1521123429 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -243,33 +243,20 @@ public MemoryRecords build() { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. * - * If the log append time is used, the offset will be the last offset unless no compression is used and Review Comment: @showuon @junrao thanks for this change and I learn a lot from it. BTW, should we document this behavior? I mean how we decide the offset of max timestamp when there are multi-records having same timestamp. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520862105 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -69,6 +97,15 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { createOldMessageFormatBrokers() produceMessagesInOneBatch() verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") Review Comment: This is to test the logic in `RecordsInfo#info`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520861572 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -60,6 +63,31 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { produceMessagesInOneBatch("gzip") verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) Review Comment: Added test cases for `LogAppendTime` scenarios. This is test the logic in `validateMessagesAndAssignOffsetsCompressed`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520858564 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +53,24 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before() { cluster.config().serverProperties().put("auto.create.topics.enable", false); cluster.config().serverProperties().put("offsets.topic.replication.factor", "1"); cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount)); +} +public void setUp() { Review Comment: The `setUp` is necessary because when in `before` (i.e. `beforeEach`), the kafka cluster is still not created yet. That's why we can inject custom broker properties there. And in `setUp`, we can create producer/admin to talk to the brokers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520857036 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: > This has the same issue. The semantic for MAX_TIMESTAMP is the first offset with the max timestamp. So, if timestamp is TimestampType.LOG_APPEND_TIME, we need to use the baseOffset, instead of lastOffset. Yes, updated. > Also, could we remove the shallow part in RecordsInfo.shallowOffsetOfMaxTimestamp? I've added a comment in another PR: https://github.com/apache/kafka/pull/15476#issuecomment-1990459827 . We can do all the renaming there. ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: > This has the same issue. The semantic for MAX_TIMESTAMP is the first offset with the max timestamp. So, if timestamp is TimestampType.LOG_APPEND_TIME, we need to use the baseOffset, instead of lastOffset. Yes, updated. > Also, could we remove the shallow part in RecordsInfo.shallowOffsetOfMaxTimestamp? I've added a comment in another PR: https://github.com/apache/kafka/pull/15476#issuecomment-1990459827 . We can do all the renaming there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520575467 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) createTopic(topicName, 1, 1.toShort) -produceMessages() adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { Review Comment: > We can give the test a more descriptive name though, right? sure. sorry for the lazy naming :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
jolshan commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520542402 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) createTopic(topicName, 1, 1.toShort) -produceMessages() adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { Review Comment: We can give the test a more descriptive name though, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
junrao commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520506664 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +53,24 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before() { cluster.config().serverProperties().put("auto.create.topics.enable", false); cluster.config().serverProperties().put("offsets.topic.replication.factor", "1"); cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount)); +} +public void setUp() { Review Comment: Why do we need to split the logic between `before` and `setUp`? ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { ).asJava, new ListOffsetsOptions()).all().get().get(tp) } - def produceMessages(): Unit = { + def produceMessagesInOneBatch(compressionType: String = "none"): Unit = { Review Comment: Could this method be private? Ditto for `produceMessagesInSeparateBatch`. ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: This has the same issue. The semantic for MAX_TIMESTAMP is the first offset with the max timestamp. So, if timestamp is TimestampType.LOG_APPEND_TIME, we need to use the baseOffset, instead of lastOffset. Also, could we remove the shallow part in RecordsInfo.shallowOffsetOfMaxTimestamp? ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { ).asJava, new ListOffsetsOptions()).all().get().get(tp) } - def produceMessages(): Unit = { + def produceMessagesInOneBatch(compressionType: String = "none"): Unit = { val records = Seq( new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L, -null, new Array[Byte](1)), +null, new Array[Byte](10)), new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L, -null, new Array[Byte](1)), +null, new Array[Byte](10)), new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L, -null, new Array[Byte](1)), +null, new Array[Byte](10)), ) -TestUtils.produceMessages(brokers, records, -1) +// create a producer with large linger.ms and enough batch.size (default is enough for three 10 bytes records), +// so that we can confirm all records will be accumulated in producer until we flush them into one batch. +val producer = createProducer( + plaintextBootstrapServers(brokers), + deliveryTimeoutMs = Int.MaxValue, + lingerMs = Int.MaxValue, + compressionType = compressionType) + +try { + val futures = records.map(producer.send) + producer.flush() + futures.foreach(_.get) +} finally { + producer.close() +} } - def generateConfigs: Seq[KafkaConfig] = -TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(KafkaConfig.fromProps) + def produceMessagesInSeparateBatch(compressionType: String = "none"): Unit = { +val records = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L, +null, new Array[Byte](10))) +val records2 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L, + null, new Array[Byte](10))) +val records3 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L, + null, new Array[Byte](10))) + +val producer = createProducer( + plai
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520206161 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) createTopic(topicName, 1, 1.toShort) -produceMessages() adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { Review Comment: How about using `CsvSource` to list all cases? for example: ```scala @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @CsvSource(Array( "zk, true, true, true", "zk, true, true, false", "zk, true, false, true", "zk, true, false, false", "zk, false, true, true", "zk, false, true, false", //"zk, false, false, true", KAFKA-16341 will enable it "zk, false, false, false", "kraft, true, false, true", "kraft, true, false, false", //"kraft, false, false, true", KAFKA-16341 will enable it "kraft, false, false, false" )) def test(quorum: String, compression: Boolean, oldMessage: Boolean, oneBatch: Boolean): Unit = { if (oldMessage) createOldMessageFormatBrokers() if (oneBatch) produceMessagesInOneBatch(if(compression) "gzip" else "none") else produceMessagesInSeparateBatch(if(compression) "gzip" else "none") verifyListOffsets() } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1519669214 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -36,65 +37,149 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) createTopic(topicName, 1, 1.toShort) -produceMessages() adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + } Review Comment: It is not efficient previously since we created a cluster and wrote some data, and then, only verify one kind of offset. After this PR, I verified 3 kinds of offsets in one test, and created different test cases for different scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1519665612 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: Thanks all for the comments! I've updated the tests. Please take a look when available. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on PR #15474: URL: https://github.com/apache/kafka/pull/15474#issuecomment-1988370290 > Also, [#15476 (comment)](https://github.com/apache/kafka/pull/15476#discussion_r1518413083) applies here too. We just need to fix it in one of the PRs. Let's fix it in #15476. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1519662867 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -417,8 +422,14 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse long lastOffset = offsetCounter.value - 1; firstBatch.setLastOffset(lastOffset); -if (timestampType == TimestampType.LOG_APPEND_TIME) +if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; +if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { +offsetOfMaxTimestamp = lastOffset; Review Comment: You're right, in the `create time` case, we'll scan through records 1 by 1, and only set the `offsetOfMaxTimestamp` when timestamp > previous one, so the semantic for MAX_TIMESTAMP is the _first offset_ with the max timestamp. Change it to initialOffset for `append time` case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
junrao commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1518419803 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -417,8 +422,14 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse long lastOffset = offsetCounter.value - 1; firstBatch.setLastOffset(lastOffset); -if (timestampType == TimestampType.LOG_APPEND_TIME) +if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; +if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { +offsetOfMaxTimestamp = lastOffset; Review Comment: Hmm, we should be consistent among different magic values, right? It seems the semantic for MAX_TIMESTAMP is the first offset with the max timestamp. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
junrao commented on PR #15474: URL: https://github.com/apache/kafka/pull/15474#issuecomment-1986585552 Also, https://github.com/apache/kafka/pull/15476#discussion_r1518413083 applies to here too. We just need to fix it in one of the PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1518065927 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: > So while it is ok to configure via the flush method -- let's make the record size smaller. We should confirm the batching is as expected when we run the tests. :) +1 to this comment 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
jolshan commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1518058151 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: I don't think there is a need to have 10KB records regardless -- we can't have this size in a single batch consistently. So while it is ok to configure via the flush method -- let's make the record size smaller. We should confirm the batching is as expected when we run the tests. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
ijuma commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1518055609 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: `producer.flush` is fine if we set large `batch.size` and `linger.ms` so we can control when it happens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1517951926 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: > We should probably have both cases. There are two cases we want to test: 1. three records are in single request 2. three records are sent by different request Hence, we should use `producer#flush` instead of setting different record size. The former certainly address both cases. By contrast, we can't ensure the "larger" record size can result in multi requests since it depends heavily on the buffer size used by producer. For example, the test will get pointless if we increase the default buffer size in the future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
ijuma commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1516657707 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: > We should also fix this test to use 10 bytes rather than 10KB since it was hiding an error as well. We should probably have both cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
ijuma commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1516657707 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: > We should also fix this test to use 10 bytes rather than 10KB since it was hiding an error as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
ijuma commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1516656926 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: @showuon As Justine pointed out, we have a test that verifies that list offsets requests works correctly. That's a better test to verify this than the offset shell test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
jolshan commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1516626745 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: We should also fix this test to use 10 bytes rather than 10KB since it was hiding an error as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
jolshan commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1516625703 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: Hey sorry for the delay. I made the following modification to ListOffsetsIntegrationTest. I also added the compressionType argument to produceMessages in TestUtils. ``` @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testMaxTimestampOffset(quorum: String): Unit = { val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp()) assertEquals(1, maxTimestampOffset.offset()) } ... def produceMessages(): Unit = { val records = Seq( new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L, null, new Array[Byte](10)), new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L, null, new Array[Byte](10)), new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L, null, new Array[Byte](10)), ) TestUtils.produceMessages(brokers, records, -1, compressionType = "gzip") } ``` Unfortunately produceMessages is called in before each, so it isn't as easily parameterized. But this had a few lines changes and reproduced the error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1513659746 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: I'm not sure what _weird test_ you mean here. I've refactored it by using `serverProperties` to provide custom server properties for specific tests, if that's what you meant _weird_. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
ijuma commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1513013595 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: Hmm, this is a weird test to verify this behavior. @hachikuji @jolshan had identified a test that seemed to be a better candidate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1512790426 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping Review Comment: You're right! Tests added. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1512657775 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping Review Comment: It seems to me this also fix the path of nonexistent magic code (`convertAndAssignOffsetsNonCompressed`). Is it possible to add test for that case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1512607158 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -333,7 +382,7 @@ private void assertExitCodeIsOne(String... args) { } private List expectedOffsetsWithInternal() { -List consOffsets = IntStream.range(0, offsetTopicPartitionCount + 1) +List consOffsets = IntStream.range(0, offsetTopicPartitionCount) Review Comment: This is a side fix for this test. Before this PR, the `offsetTopicPartitionCount` doesn't feed into cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on PR #15474: URL: https://github.com/apache/kafka/pull/15474#issuecomment-1978460395 @chia7712 @ijuma @hachikuji , please take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1512604852 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -379,8 +381,11 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse && batch.magic() > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { -if (record.timestamp() > maxTimestamp) +if (record.timestamp() > maxTimestamp) { maxTimestamp = record.timestamp(); +// The offset is only increased when it is a valid record +offsetOfMaxTimestamp = initialOffset + validatedRecords.size(); Review Comment: Also set the correct offset of MaxTimestamp while records traversing. ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; Review Comment: I don't understand why we should always set to the last offset here. This will fail the getOffsetByMaxTimestamp test. Is that expected? Maybe @ijuma could answer this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org