Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-16 Thread via GitHub


showuon merged PR #15542:
URL: https://github.com/apache/kafka/pull/15542


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-15 Thread via GitHub


johnnychhsu commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1526027063


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,236 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
+  val mockTime: Time = new MockTime(1)
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages()
+createTopicWithConfig(topicName, new Properties())
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
+  override def brokerTime(brokerId: Int): Time = mockTime
+
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+produceMessagesInSeparateBatch()
+verifyListOffsets()

Review Comment:
   thanks @showuon! I will address this in #15476 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-14 Thread via GitHub


showuon commented on PR #15474:
URL: https://github.com/apache/kafka/pull/15474#issuecomment-1998834117

   Thanks all for the review!
   
   I've backported to v3.7. For v3.6, there are more codes diff, I'd like to 
run CI first before push the change. PR: 
https://github.com/apache/kafka/pull/15542 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-14 Thread via GitHub


showuon opened a new pull request, #15542:
URL: https://github.com/apache/kafka/pull/15542

   Backported https://github.com/apache/kafka/pull/15474 to v3.6 branch. Since 
there is more code diff, I'd like to make sure the backport doesn't break any 
tests.
   
   Fix getOffsetByMaxTimestamp for compressed records.
   
   This PR adds:
   
  1. For inPlaceAssignment case, compute the correct offset for 
maxTimestamp when traversing the batch records, and set to ValidationResult in 
the end, instead of setting to last offset always.
   2. For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log 
create time, like non-compressed, and inPlaceAssignment cases, instead of 
setting to last offset always.
   3. Add tests to verify the fix.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-14 Thread via GitHub


chia7712 commented on PR #15474:
URL: https://github.com/apache/kafka/pull/15474#issuecomment-1998576991

   Thanks for all reviews and @showuon. This is not only a important fix to 
kafka but also a great experience to me :)
   
   @showuon Could you please backport this fix? thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-14 Thread via GitHub


chia7712 merged PR #15474:
URL: https://github.com/apache/kafka/pull/15474


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-14 Thread via GitHub


chia7712 commented on PR #15474:
URL: https://github.com/apache/kafka/pull/15474#issuecomment-1998564199

   the failed tests pass on my local.
   ```sh
   ./gradlew cleanTest core:test --tests LogDirFailureTest --tests 
ReplicaManagerTest connect:mirror:test --tests 
MirrorConnectorsIntegrationTransactionsTest streams:test --tests 
HighAvailabilityTaskAssignorIntegrationTest
   ```
   
   the links for those flaky are shown below:
   
   https://issues.apache.org/jira/browse/KAFKA-16225
   https://issues.apache.org/jira/browse/KAFKA-16376
   https://issues.apache.org/jira/browse/KAFKA-16377
   https://issues.apache.org/jira/browse/KAFKA-15945


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-13 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1524067155


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,236 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
+  val mockTime: Time = new MockTime(1)
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages()
+createTopicWithConfig(topicName, new Properties())
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
+  override def brokerTime(brokerId: Int): Time = mockTime
+
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+produceMessagesInSeparateBatch()
+verifyListOffsets()

Review Comment:
   It's because the fix about "returning the first offset when multi-records 
having the same maxTimestamp" for "non-compressed" records are in this PR: 
https://github.com/apache/kafka/pull/15476 . we should add the logAppendTime 
test there. :) cc @johnnychhsu



##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,236 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
+  val mockTime: Time = new MockTime(1)
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-

Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-13 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1524067155


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,236 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
+  val mockTime: Time = new MockTime(1)
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages()
+createTopicWithConfig(topicName, new Properties())
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
+  override def brokerTime(brokerId: Int): Time = mockTime
+
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+produceMessagesInSeparateBatch()
+verifyListOffsets()

Review Comment:
   It's because the fix about "returning the first offset when multi-records 
having the same maxTimestamp" for "non-compressed" records are in this PR: 
https://github.com/apache/kafka/pull/15476 . We should add tests there. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-13 Thread via GitHub


junrao commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1523649729


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,236 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
+  val mockTime: Time = new MockTime(1)
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages()
+createTopicWithConfig(topicName, new Properties())
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
+  override def brokerTime(brokerId: Int): Time = mockTime
+
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+produceMessagesInSeparateBatch()
+verifyListOffsets()

Review Comment:
   Why did we exclude the LogAppendTime test in this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -242,34 +242,23 @@ public MemoryRecords build() {
 
 /**
  * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
+ * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
  *
- * If the log append time is used, the offset will be the last offset 
unless no compression is used and
- * the message format version is 0 or 1, in which case, it will be the 
first offset.
+ * If the log append time is used, the offset will be the first offset of 
the record.
  *
- * If create time is used, the offset will be the last offset unless no 
compression is used and the message
- * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.
+ * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
+ *
+ * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
  *
  * @return The max timestamp and its offset
  */
 public RecordsInfo info() {
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = baseOffset;
-return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
-} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
-return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
+return new RecordsInfo(logAppendTime, baseOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+// If it's MAGIC_VALUE_V0, the value will be the default value: 
[-1, -1]
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   Currently, when in `assignOffsetsNonCompressed` or 
`validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` 
for maxTimestamp and offset if it's `MAGIC_VALUE_V0` 
[here](https://github.com/apache/kafka/pull/15474/files#diff-74aba980d40ab3c5c6fd66d7a27ffb4515181130e475b589834538d05aa408b9L263-L264).
 But in the case of re-build the records, we make it as `[-1, lastOffset]`, 
which is inconsistent. Fixing it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -242,34 +242,23 @@ public MemoryRecords build() {
 
 /**
  * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
+ * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
  *
- * If the log append time is used, the offset will be the last offset 
unless no compression is used and
- * the message format version is 0 or 1, in which case, it will be the 
first offset.
+ * If the log append time is used, the offset will be the first offset of 
the record.
  *
- * If create time is used, the offset will be the last offset unless no 
compression is used and the message
- * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.
+ * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
+ *
+ * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
  *
  * @return The max timestamp and its offset
  */
 public RecordsInfo info() {
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = baseOffset;
-return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
-} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
-return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
+return new RecordsInfo(logAppendTime, baseOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+// If it's MAGIC_VALUE_V0, the value will be the default value: 
[-1, -1]
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   Currently, when in `assignOffsetsNonCompressed` or 
`validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` 
for maxTimestamp and offset if it's `MAGIC_VALUE_V0` 
[here](https://github.com/apache/kafka/pull/15474/files#diff-74aba980d40ab3c5c6fd66d7a27ffb4515181130e475b589834538d05aa408b9L263-L264).
 But in the case of re-build the records, we make the it as `[-1, lastOffset]`, 
which is inconsistent. Fixing it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -242,34 +242,23 @@ public MemoryRecords build() {
 
 /**
  * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
+ * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
  *
- * If the log append time is used, the offset will be the last offset 
unless no compression is used and
- * the message format version is 0 or 1, in which case, it will be the 
first offset.
+ * If the log append time is used, the offset will be the first offset of 
the record.
  *
- * If create time is used, the offset will be the last offset unless no 
compression is used and the message
- * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.
+ * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
+ *
+ * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
  *
  * @return The max timestamp and its offset
  */
 public RecordsInfo info() {
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = baseOffset;
-return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
-} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
-return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
+return new RecordsInfo(logAppendTime, baseOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+// If it's MAGIC_VALUE_V0, the value will be the default value: 
[-1, -1]
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   Currently, when in `assignOffsetsNonCompressed` or 
`validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` 
for maxTimestamp and offset if it's `MAGIC_VALUE_V0`. But in the case of 
re-build the records, we make the it as `[-1, lastOffset]`, which is 
inconsistent. Fixing it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522294617


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,230 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages()
+createTopicWithConfig(topicName, new Properties())
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+produceMessagesInSeparateBatch()
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this separate batch test, it'll be the last offset 2
+verifyListOffsets(topic = topicNameWithCustomConfigs, 2)

Review Comment:
   Good point! Let me improve it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


junrao commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522164475


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,230 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages()
+createTopicWithConfig(topicName, new Properties())
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+produceMessagesInSeparateBatch()
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this separate batch test, it'll be the last offset 2
+verifyListOffsets(topic = topicNameWithCustomConfigs, 2)

Review Comment:
   Do we guarantee that the server time has advanced after appending each 
batch? Ditto for `testThreeRecordsInSeparateBatchWithMessageConversion`



##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +53,24 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before() {
 cluster.config().serverProperties().put("auto.create.topics.enable", 
false);
 
cluster.config().serverProperties().put("offsets.topic.replication.factor", 
"1");
 
cluster.config().serverProperties().put("offsets.topic.num.partitions", 
String.valueOf(offsetTopicPartitionCount));
+}
 
+public void setUp() {

Review Comment:
   Ok. Could we make this setUp() private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the

Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522051894


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -243,33 +243,20 @@ public MemoryRecords build() {
 /**
  * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
  *
- * If the log append time is used, the offset will be the last offset 
unless no compression is used and

Review Comment:
   Could you please update `OffsetSpec` also? 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java#L65



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1521398482


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -243,33 +243,20 @@ public MemoryRecords build() {
 /**
  * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
  *
- * If the log append time is used, the offset will be the last offset 
unless no compression is used and

Review Comment:
   Updated. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1521123429


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -243,33 +243,20 @@ public MemoryRecords build() {
 /**
  * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
  *
- * If the log append time is used, the offset will be the last offset 
unless no compression is used and

Review Comment:
   @showuon @junrao thanks for this change and I learn a lot from it. 
   
   BTW, should we document this behavior? I mean how we decide the offset of 
max timestamp when there are multi-records having same timestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520862105


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -69,6 +97,15 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 createOldMessageFormatBrokers()
 produceMessagesInOneBatch()
 verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")

Review Comment:
   This is to test the logic in `RecordsInfo#info`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520861572


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -60,6 +63,31 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
 produceMessagesInOneBatch("gzip")
 verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)

Review Comment:
   Added test cases for `LogAppendTime` scenarios. This is test the logic in 
`validateMessagesAndAssignOffsetsCompressed`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520858564


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +53,24 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before() {
 cluster.config().serverProperties().put("auto.create.topics.enable", 
false);
 
cluster.config().serverProperties().put("offsets.topic.replication.factor", 
"1");
 
cluster.config().serverProperties().put("offsets.topic.num.partitions", 
String.valueOf(offsetTopicPartitionCount));
+}
 
+public void setUp() {

Review Comment:
   The `setUp` is necessary because when in `before` (i.e. `beforeEach`), the 
kafka cluster is still not created yet. That's why we can inject custom broker 
properties there. And in `setUp`, we can create producer/admin to talk to the 
brokers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520857036


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   > This has the same issue. The semantic for MAX_TIMESTAMP is the first 
offset with the max timestamp. So, if timestamp is 
TimestampType.LOG_APPEND_TIME, we need to use the baseOffset, instead of 
lastOffset.
   
   Yes, updated.
   
   > Also, could we remove the shallow part in 
RecordsInfo.shallowOffsetOfMaxTimestamp?
   I've added a comment in another PR: 
https://github.com/apache/kafka/pull/15476#issuecomment-1990459827 . We can do 
all the renaming there.



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   > This has the same issue. The semantic for MAX_TIMESTAMP is the first 
offset with the max timestamp. So, if timestamp is 
TimestampType.LOG_APPEND_TIME, we need to use the baseOffset, instead of 
lastOffset.
   
   Yes, updated.
   
   > Also, could we remove the shallow part in 
RecordsInfo.shallowOffsetOfMaxTimestamp?
   
   I've added a comment in another PR: 
https://github.com/apache/kafka/pull/15476#issuecomment-1990459827 . We can do 
all the renaming there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520575467


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   val topicName = "foo"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
 createTopic(topicName, 1, 1.toShort)
-produceMessages()
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {

Review Comment:
   > We can give the test a more descriptive name though, right?
   
   sure. sorry for the lazy naming :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520542402


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   val topicName = "foo"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
 createTopic(topicName, 1, 1.toShort)
-produceMessages()
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {

Review Comment:
   We can give the test a more descriptive name though, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


junrao commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520506664


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +53,24 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before() {
 cluster.config().serverProperties().put("auto.create.topics.enable", 
false);
 
cluster.config().serverProperties().put("offsets.topic.replication.factor", 
"1");
 
cluster.config().serverProperties().put("offsets.topic.num.partitions", 
String.valueOf(offsetTopicPartitionCount));
+}
 
+public void setUp() {

Review Comment:
   Why do we need to split the logic between `before` and `setUp`?



##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 ).asJava, new ListOffsetsOptions()).all().get().get(tp)
   }
 
-  def produceMessages(): Unit = {
+  def produceMessagesInOneBatch(compressionType: String = "none"): Unit = {

Review Comment:
   Could this method be private? Ditto for `produceMessagesInSeparateBatch`.



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   This has the same issue. The semantic for MAX_TIMESTAMP is the first offset 
with the max timestamp. So, if timestamp is TimestampType.LOG_APPEND_TIME, we 
need to use the baseOffset, instead of lastOffset.
   
   Also, could we remove the shallow part in 
RecordsInfo.shallowOffsetOfMaxTimestamp?



##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 ).asJava, new ListOffsetsOptions()).all().get().get(tp)
   }
 
-  def produceMessages(): Unit = {
+  def produceMessagesInOneBatch(compressionType: String = "none"): Unit = {
 val records = Seq(
   new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L,
-null, new Array[Byte](1)),
+null, new Array[Byte](10)),
   new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L,
-null, new Array[Byte](1)),
+null, new Array[Byte](10)),
   new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
-null, new Array[Byte](1)),
+null, new Array[Byte](10)),
 )
-TestUtils.produceMessages(brokers, records, -1)
+// create a producer with large linger.ms and enough batch.size (default 
is enough for three 10 bytes records),
+// so that we can confirm all records will be accumulated in producer 
until we flush them into one batch.
+val producer = createProducer(
+  plaintextBootstrapServers(brokers),
+  deliveryTimeoutMs = Int.MaxValue,
+  lingerMs = Int.MaxValue,
+  compressionType = compressionType)
+
+try {
+  val futures = records.map(producer.send)
+  producer.flush()
+  futures.foreach(_.get)
+} finally {
+  producer.close()
+}
   }
 
-  def generateConfigs: Seq[KafkaConfig] =
-TestUtils.createBrokerConfigs(1, 
zkConnectOrNull).map(KafkaConfig.fromProps)
+  def produceMessagesInSeparateBatch(compressionType: String = "none"): Unit = 
{
+val records = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 
0, 100L,
+null, new Array[Byte](10)))
+val records2 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 
0, 999L,
+  null, new Array[Byte](10)))
+val records3 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 
0, 200L,
+  null, new Array[Byte](10)))
+
+val producer = createProducer(
+  

Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520206161


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   val topicName = "foo"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
 createTopic(topicName, 1, 1.toShort)
-produceMessages()
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {

Review Comment:
   How about using `CsvSource` to list all cases? for example:
   ```scala
 @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
 @CsvSource(Array(
   "zk, true, true, true",
   "zk, true, true, false",
   "zk, true, false, true",
   "zk, true, false, false",
   "zk, false, true, true",
   "zk, false, true, false",
   //"zk, false, false, true", KAFKA-16341 will enable it 
   "zk, false, false, false",
   "kraft, true, false, true",
   "kraft, true, false, false",
   //"kraft, false, false, true", KAFKA-16341 will enable it 
   "kraft, false, false, false"
 ))
 def test(quorum: String, compression: Boolean, oldMessage: Boolean, 
oneBatch: Boolean): Unit = {
   if (oldMessage) createOldMessageFormatBrokers()
   if (oneBatch) produceMessagesInOneBatch(if(compression) "gzip" else 
"none")
   else produceMessagesInSeparateBatch(if(compression) "gzip" else "none")
   verifyListOffsets()
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1519669214


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -36,65 +37,149 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   val topicName = "foo"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
 createTopic(topicName, 1, 1.toShort)
-produceMessages()
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+  }

Review Comment:
   It is not efficient previously since we created a cluster and wrote some 
data, and then, only verify one kind of offset. After this PR, I verified 3 
kinds of offsets in one test, and created different test cases for different 
scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1519665612


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   Thanks all for the comments! I've updated the tests. Please take a look when 
available. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on PR #15474:
URL: https://github.com/apache/kafka/pull/15474#issuecomment-1988370290

   > Also, [#15476 
(comment)](https://github.com/apache/kafka/pull/15476#discussion_r1518413083) 
applies here too. We just need to fix it in one of the PRs.
   
   Let's fix it in #15476. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1519662867


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -417,8 +422,14 @@ public ValidationResult 
validateMessagesAndAssignOffsetsCompressed(LongRef offse
 long lastOffset = offsetCounter.value - 1;
 firstBatch.setLastOffset(lastOffset);
 
-if (timestampType == TimestampType.LOG_APPEND_TIME)
+if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
+offsetOfMaxTimestamp = lastOffset;

Review Comment:
   You're right, in the `create time` case, we'll scan through records 1 by 1, 
and only set the `offsetOfMaxTimestamp` when timestamp > previous one, so the 
semantic for MAX_TIMESTAMP is the _first offset_ with the max timestamp. Change 
it to initialOffset for `append time` case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


junrao commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1518419803


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -417,8 +422,14 @@ public ValidationResult 
validateMessagesAndAssignOffsetsCompressed(LongRef offse
 long lastOffset = offsetCounter.value - 1;
 firstBatch.setLastOffset(lastOffset);
 
-if (timestampType == TimestampType.LOG_APPEND_TIME)
+if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
+offsetOfMaxTimestamp = lastOffset;

Review Comment:
   Hmm, we should be consistent among different magic values, right? It seems 
the semantic for MAX_TIMESTAMP is the first offset with the max timestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


junrao commented on PR #15474:
URL: https://github.com/apache/kafka/pull/15474#issuecomment-1986585552

   Also, https://github.com/apache/kafka/pull/15476#discussion_r1518413083 
applies to here too. We just need to fix it in one of the PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1518065927


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   > So while it is ok to configure via the flush method -- let's make the 
record size smaller. We should confirm the batching is as expected when we run 
the tests. :)
   
   +1 to this comment  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


jolshan commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1518058151


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   I don't think there is a need to have 10KB records regardless -- we can't 
have this size in a single batch consistently.
   
   So while it is ok to configure via the flush method -- let's make the record 
size smaller. We should confirm the batching is as expected when we run the 
tests. :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


ijuma commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1518055609


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   `producer.flush` is fine if we set large `batch.size` and `linger.ms` so we 
can control when it happens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-08 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1517951926


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   > We should probably have both cases.
   
   There are two cases we want to test:
   
   1. three records are in single request
   2. three records are sent by different request
   
   Hence, we should use `producer#flush` instead of setting different record 
size. The former certainly address both cases. By contrast, we can't ensure the 
"larger" record size can result in multi requests since it depends heavily on 
the buffer size used by producer. For example, the test will get pointless if 
we increase the default buffer size in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-07 Thread via GitHub


ijuma commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1516657707


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   > We should also fix this test to use 10 bytes rather than 10KB since it was 
hiding an error as well.
   
   We should probably have both cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-07 Thread via GitHub


ijuma commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1516657707


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   > We should also fix this test to use 10 bytes rather than 10KB since it was 
hiding an error as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-07 Thread via GitHub


ijuma commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1516656926


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   @showuon As Justine pointed out, we have a test that verifies that list 
offsets requests works correctly. That's a better test to verify this than the 
offset shell test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-07 Thread via GitHub


jolshan commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1516626745


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   We should also fix this test to use 10 bytes rather than 10KB since it was 
hiding an error as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-07 Thread via GitHub


jolshan commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1516625703


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   Hey sorry for the delay. I made the following modification to 
ListOffsetsIntegrationTest. I also added the compressionType argument to 
produceMessages in TestUtils.
   
   ```
 @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
 @ValueSource(strings = Array("zk", "kraft"))
 def testMaxTimestampOffset(quorum: String): Unit = {
   val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp())
   assertEquals(1, maxTimestampOffset.offset())
 }
   
   ...
   
 def produceMessages(): Unit = {
   val records = Seq(
 new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L,
   null, new Array[Byte](10)),
 new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L,
   null, new Array[Byte](10)),
 new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
   null, new Array[Byte](10)),
   )
   TestUtils.produceMessages(brokers, records, -1, compressionType = "gzip")
 }
   ```
   
   Unfortunately produceMessages is called in before each, so it isn't as 
easily parameterized. But this had a few lines changes and reproduced the error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1513659746


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   I'm not sure what _weird test_ you mean here. I've refactored it by using 
`serverProperties` to provide custom server properties for specific tests, if 
that's what you meant _weird_. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


ijuma commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1513013595


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   Hmm, this is a weird test to verify this behavior. @hachikuji @jolshan had 
identified a test that seemed to be a better candidate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1512790426


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping

Review Comment:
   You're right! Tests added. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1512657775


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping

Review Comment:
   It seems to me this also fix the path of nonexistent magic code 
(`convertAndAssignOffsetsNonCompressed`). Is it possible to add test for that 
case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1512607158


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -333,7 +382,7 @@ private void assertExitCodeIsOne(String... args) {
 }
 
 private List expectedOffsetsWithInternal() {
-List consOffsets = IntStream.range(0, offsetTopicPartitionCount + 
1)
+List consOffsets = IntStream.range(0, offsetTopicPartitionCount)

Review Comment:
   This is a side fix for this test. Before this PR, the 
`offsetTopicPartitionCount` doesn't feed into cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon commented on PR #15474:
URL: https://github.com/apache/kafka/pull/15474#issuecomment-1978460395

   @chia7712 @ijuma @hachikuji , please take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1512604852


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -379,8 +381,11 @@ public ValidationResult 
validateMessagesAndAssignOffsetsCompressed(LongRef offse
 && batch.magic() > RecordBatch.MAGIC_VALUE_V0
 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
 
-if (record.timestamp() > maxTimestamp)
+if (record.timestamp() > maxTimestamp) {
 maxTimestamp = record.timestamp();
+// The offset is only increased when it is a valid 
record
+offsetOfMaxTimestamp = initialOffset + 
validatedRecords.size();

Review Comment:
   Also set the correct offset of MaxTimestamp while records traversing. 



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;

Review Comment:
   I don't understand why we should always set to the last offset here. This 
will fail the getOffsetByMaxTimestamp test. Is that expected? Maybe @ijuma 
could answer this? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon opened a new pull request, #15474:
URL: https://github.com/apache/kafka/pull/15474

   Fix `getOffsetByMaxTimestamp` for compressed records.
   
   This PR adds:
   1. For inPlaceAssignment case, compute the correct offset for maxTimestamp 
when traversing the batch records, and set to `ValidationResult` in the end, 
instead of setting to last offset always.
   2. For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log 
create time, like non-compressed, and inPlaceAssignment cases, instead of 
setting to last offset always.
   3. Add tests to verify the fix.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org