Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-05-08 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1594849626


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -404,9 +414,7 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-val expectedOffsetOfMaxTimestamp = 1
-assertEquals(expectedOffsetOfMaxTimestamp, 
validatingResults.offsetOfMaxTimestampMs,
-  s"Offset of max timestamp should be 1")
+assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)

Review Comment:
   see https://github.com/apache/kafka/pull/15904



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-05-08 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1594333143


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -404,9 +414,7 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-val expectedOffsetOfMaxTimestamp = 1
-assertEquals(expectedOffsetOfMaxTimestamp, 
validatingResults.offsetOfMaxTimestampMs,
-  s"Offset of max timestamp should be 1")
+assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)

Review Comment:
   > yep, that is a "TYPO" but it does not change the test.
   
   The issue is that the test is testing the wrong expected value. For magic of 
1, the offset for max timestamp should be 1 instead of 2.
   
   > However, I do observe a potential bug.
   
   Yes, this can lead to inaccurate LogAppendInfo.sourceCompression. But it 
doesn't seem to have real impact now. LogAppendInfo.sourceCompression is only 
used in LogValidator, which is only called by the leader. In the leader, 
currently, we expect only 1 batch per producer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-05-08 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1594333143


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -404,9 +414,7 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-val expectedOffsetOfMaxTimestamp = 1
-assertEquals(expectedOffsetOfMaxTimestamp, 
validatingResults.offsetOfMaxTimestampMs,
-  s"Offset of max timestamp should be 1")
+assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)

Review Comment:
   > yep, that is a "TYPO" but it does not change the test.
   
   The issue is that the test is testing the wrong expected value. For magic of 
1, the offset should be 1 instead of 2.
   
   > However, I do observe a potential bug.
   
   Yes, this can lead to inaccurate LogAppendInfo.sourceCompression. But it 
doesn't seem to have real impact now. LogAppendInfo.sourceCompression is only 
used in LogValidator, which is only called by the leader. In the leader, 
currently, we expect only 1 batch per producer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1593260374


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -404,9 +414,7 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-val expectedOffsetOfMaxTimestamp = 1
-assertEquals(expectedOffsetOfMaxTimestamp, 
validatingResults.offsetOfMaxTimestampMs,
-  s"Offset of max timestamp should be 1")
+assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)

Review Comment:
   for another, `LogValidator` is moved to storage module already but its unit 
test is still in core module. That is a bit weird. We can rewrite it by java 
with bug fix and then move it to storage module. I have filed 
https://issues.apache.org/jira/browse/KAFKA-16689



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1593241011


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -404,9 +414,7 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-val expectedOffsetOfMaxTimestamp = 1
-assertEquals(expectedOffsetOfMaxTimestamp, 
validatingResults.offsetOfMaxTimestampMs,
-  s"Offset of max timestamp should be 1")
+assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)

Review Comment:
   yep, that is a "TYPO" but it does not change the test. We do pass the "NONE" 
to create `LogValidator` so it will run the path `assignOffsetsNonCompressed` 
   
https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala#L377
   
   However, I do observe a potential bug. 
   
   **context**
   1. Those batches can have different compression
   2. We take the compression from last batch
   
https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/log/UnifiedLog.scala#L1180
   
   **potential bug**
   
   topic-level compression = GZIP
   batch_0 = NONE
   batch_1 = GZIP
   
   In this case, we don't rebuild records according to topic-level compression 
since the compression of "last batch" is equal to `GZIP`. Hence, it results in 
batch_0 having incorrect compression.
   
   This bug does not produce corrupt records, so we can add comments/docs to 
describe that issue. Or we can fix it by changing the `sourceCompression` to be 
a "collection" of all batches' compression, and then do conversion if one of 
them is mismatched.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-05-07 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1592859538


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -404,9 +414,7 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-val expectedOffsetOfMaxTimestamp = 1
-assertEquals(expectedOffsetOfMaxTimestamp, 
validatingResults.offsetOfMaxTimestampMs,
-  s"Offset of max timestamp should be 1")
+assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)

Review Comment:
   @chia7712 : There seems to be an existing bug. The method is 
`checkNonCompressed()`, but in line 370, we set the compression codec to GZIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-09 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2046489133

   @junrao @showuon thanks for all your reviews and helps!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-09 Thread via GitHub


chia7712 merged PR #15621:
URL: https://github.com/apache/kafka/pull/15621


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-09 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2046457414

   ```
   Build / JDK 21 and Scala 2.13 / testInvalidPasswordSaslScram() – 
org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureNoDelayTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16497
   ```
   Build / JDK 21 and Scala 2.13 / testReplicateFromLatest() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16383
   ```
   Build / JDK 21 and Scala 2.13 / testAlterSinkConnectorOffsetsZombieSinkTasks 
– org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15917
   ```
   Build / JDK 21 and Scala 2.13 / testGetSinkConnectorOffsets – 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16498
   ```
   Build / JDK 21 and Scala 2.13 / 
testResetSinkConnectorOffsetsOverriddenConsumerGroupId – 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15891
   ```
   Build / JDK 21 and Scala 2.13 / testCacheEviction() – 
org.apache.kafka.server.ClientMetricsManagerTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16499
   ```
   Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16136
   ```
   Build / JDK 17 and Scala 2.13 / "testTrustStoreAlter(String).quorum=kraft" – 
kafka.server.DynamicBrokerReconfigurationTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16500
   ```
   Build / JDK 8 and Scala 2.12 / testConsumptionWithBrokerFailures() – 
kafka.api.ConsumerBounceTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15146
   ```
   Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15927
   ```
   Build / JDK 11 and Scala 2.13 / testSeparateOffsetsTopic – 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-14089
   ```
   Build / JDK 11 and Scala 2.13 / testConsumptionWithBrokerFailures() – 
kafka.api.ConsumerBounceTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15146
   ```
   Build / JDK 11 and Scala 2.13 / 
"testCreateUserWithDelegationToken(String).quorum=kraft" – 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16501
   ```
   Build / JDK 11 and Scala 2.13 / 
"testBrokerHeartbeatDuringMigration(MetadataVersion).metadataVersion=3.4-IV0" – 
org.apache.kafka.controller.QuorumControllerTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15963
   ```
   Build / JDK 11 and Scala 2.13 / 
shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2] – 
org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16502
   ```
   Build / JDK 11 and Scala 2.13 / testDescribeQuorumReplicationSuccessful [1] 
Type=Rt-Combined, MetadataVersion=3.8-IV0, Security=PLAINTEXT – 
org.apache.kafka.tools.MetadataQuorumCommandTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15104
   
   ok, all pass on my local and they have jira.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-09 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2045917889

   >  In the last run, it seem that JDK 21 and Scala 2.13 didn't complete. 
Could you trigger another build? Typically, this could be done by closing the 
PR, waiting for 20 secs and opening it again.
   
   thanks for the tip. I rebase the code to trigger QA in order to make sure 
this PR works well with latest code :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-09 Thread via GitHub


junrao commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2045841631

   @chia7712 : Thanks for triaging the failed tests. In the last run, it seem 
that JDK 21 and Scala 2.13 didn't complete. Could you trigger another build? 
Typically, this could be done by closing the PR, waiting for 20 secs and 
opening it again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-09 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2045819511

   ```
   Build / JDK 11 and Scala 2.13 / 
testLowMaxFetchSizeForRequestAndPartition(String, 
String).quorum=kraft+kip848.groupProtocol=consumer – 
kafka.api.PlaintextConsumerFetchTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16494
   ```
   Build / JDK 11 and Scala 2.13 / testFenceMultipleBrokers() – 
org.apache.kafka.controller.QuorumControllerTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15898
   ```
   Build / JDK 17 and Scala 2.13 / testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
   Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15945
   ```
   Build / JDK 17 and Scala 2.13 / testReplicateFromLatest() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16383
   ```
   Build / JDK 8 and Scala 2.12 / testCoordinatorFailover(String, 
String).quorum=kraft.groupProtocol=classic – kafka.api.SslConsumerTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16024
   ```
   Build / JDK 8 and Scala 2.12 / 
"testCommitTransactionTimeout(String).quorum=kraft+kip848" – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16495
   ```
   Build / JDK 8 and Scala 2.12 / testDescribeQuorumReplicationSuccessful [2] 
Type=Raft-Isolated, MetadataVersion=3.8-IV0, Security=PLAINTEXT – 
org.apache.kafka.tools.MetadataQuorumCommandTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15104
   
   @junrao those failed tests pass on my local, and they have jira now. Please 
review this PR again. thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-09 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2045781013

   ```
   Build / JDK 11 and Scala 2.13 / 
testLowMaxFetchSizeForRequestAndPartition(String, 
String).quorum=kraft+kip848.groupProtocol=consumer – 
kafka.api.PlaintextConsumerFetchTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-09 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2044846640

   >  Thanks for the updated PR. The code looks good to me. There were 50 
failed tests. Is any of them related to the PR? If not, have they all been 
tracked?
   
   there are many timeout exception, and so I feel that could be caused by busy 
server. I will trigger QA again instead of creating a bunch of flaky issues


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-08 Thread via GitHub


junrao commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2043792883

   @chia7712 : Thanks for the updated PR. The code looks good to me. There were 
50 failed tests. Is any of them related to the PR? If not, have they all been 
tracked?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2043274125

   @junrao thanks for reviews. both comments get addressed in 
https://github.com/apache/kafka/pull/15621/commits/581242c1fa6c005bf91a7ced96932774c2c02cd9


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-08 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1556068446


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -56,11 +60,33 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   @AfterEach
   override def tearDown(): Unit = {
-setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListMaxTimestampWithEmptyLog(quorum: String): Unit = {
+val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topicName)
+assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, 
maxTimestampOffset.offset())
+assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
maxTimestampOffset.timestamp())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testListVersion0(quorum: String): Unit = {
+// create records for version 0
+createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V0)
+produceMessagesInSeparateBatch()
+
+// update version to version 1 to list offset for max timestamp
+createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1)
+// the offset of max timestamp is always -1 if the batch version is 0
+verifyListOffsets(expectedMaxTimestampOffset = -1)
+

Review Comment:
   extra new line



##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -123,7 +149,7 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   @ParameterizedTest
   @ValueSource(strings = Array("zk"))
   def testThreeRecordsInSeparateBatchWithMessageConversion(quorum: String): 
Unit = {
-createOldMessageFormatBrokers()
+createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1)

Review Comment:
   > // In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
   > // So in this separate batch test, it'll be the last offset 2
   The comment in line 159 is not very accurate. Since we advance the time for 
each batch, the maxTimestampOffset is the message in the last batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-07 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2041650425

   > There are quite a few test failures on 
[kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15621/35/testReport/junit/kafka.server/ListOffsetsRequestTest/Build___JDK_21_and_Scala_2_13___testResponseIncludesLeaderEpoch__/).
   
   yep, I have fixed it on my local. will update PR later. thanks for the 
reminder :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-07 Thread via GitHub


junrao commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2041649750

   @chia7712 : There are quite a few test failures on 
[kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15621/35/testReport/junit/kafka.server/ListOffsetsRequestTest/Build___JDK_21_and_Scala_2_13___testResponseIncludesLeaderEpoch__/).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-06 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1554638802


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)

Review Comment:
   I fix it in 
https://github.com/apache/kafka/pull/15621/commits/8b1005e4385dae14901d5b07fcf365e49bf93127



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-06 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1554155328


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)

Review Comment:
   This is an existing issue. But it seems that the epoch of MAX_TIMESTAMP was 
never implemented correctly. This should be the epoch corresponding to the 
returned offset and is not necessarily the latest epoch. We could either fix it 
here or in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-06 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2041118676

   ```
   Build / JDK 21 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15927
   
   ```
   Build / JDK 21 and Scala 2.13 / testRemoteLogReaderMetrics() – 
kafka.server.ReplicaManagerTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16481
   ```
   Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-8115
   ```
   Build / JDK 17 and Scala 2.13 / 
testIOExceptionDuringCheckpoint(String).quorum=kraft – 
kafka.server.LogDirFailureTest
   Build / JDK 11 and Scala 2.13 / 
testIOExceptionDuringLogRoll(String).quorum=kraft – 
kafka.server.LogDirFailureTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16234
   
   ```
   Build / JDK 17 and Scala 2.13 / testFenceMultipleBrokers() – 
org.apache.kafka.controller.QuorumControllerTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15898
   ```
   Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-16136
   ```
   Build / JDK 8 and Scala 2.12 / testConsumptionWithBrokerFailures() – 
kafka.api.ConsumerBounceTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15146
   ```
   Build / JDK 8 and Scala 2.12 / 
testSendOffsetsToTransactionTimeout(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   Build / JDK 11 and Scala 2.13 / 
testAbortTransactionTimeout(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   ```
   https://issues.apache.org/jira/browse/KAFKA-15772
   
   ok, it seems all failed flaky have jira now. wait for remove reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-05 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2040270805

   > Regarding the previous failed tests, one possibility is that the data on 
the server passed the retention time and is garbage collected. The default 
retention time is 7 days, which should be long enough. However, since we reuse 
mockTime, if the test runs long, the retention time might still be reached. 
Perhaps we could set 
[log.retention.ms](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms)
 to -1?
   
   ```
   org.opentest4j.AssertionFailedError: expected: <0> but was: <3>
   ```
   
   You really hit the bullseye. I can reproduce the error by doing a little 
sleep before fetching data. Will set `retention.ms` to -1
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-05 Thread via GitHub


junrao commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2040231361

   @chia7712 : Thanks for the updated PR. Regarding the previous failed tests, 
one possibility is that the data on the server passed the retention time and is 
garbage collected. The default retention time is 7 days, which should be long 
enough. However, since we reuse mockTime, if the test runs long, the retention 
time might still be reached. Perhaps we could set 
[log.retention.ms](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms)
 to -1?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2038916109

   previous failed tests are gone. rebase to trigger QA again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1552358023


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -189,14 +215,56 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   }
 
   private def verifyListOffsets(topic: String = topicName, 
expectedMaxTimestampOffset: Int = 1): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
-assertEquals(0, earliestOffset.offset())
+def check(): Unit = {
+  val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
+  assertEquals(0, earliestOffset.offset())
 
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic)
-assertEquals(3, latestOffset.offset())
+  val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), 
topic)
+  assertEquals(3, latestOffset.offset())
+
+  val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
+  assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+}
 
-val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
-assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+// case 0: test the offsets from leader's append path
+check()
+
+// case 1: test the offsets from follower's append path.
+// we make a follower be the new leader to handle the ListOffsetRequest
+def leader(): Int = 
adminClient.describeTopics(java.util.Collections.singletonList(topic))
+  .allTopicNames().get().get(topic).partitions().get(0).leader().id()
+
+val previousLeader = leader()
+val newLeader = brokers.map(_.config.brokerId).find(_ != 
previousLeader).get
+
+// change the leader to new one
+
adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new 
TopicPartition(topic, 0),
+  Optional.of(new 
NewPartitionReassignment(java.util.Arrays.asList(newLeader).all().get()
+// wait for all reassignments get completed
+waitForAllReassignmentsToComplete(adminClient)
+// make sure we are able to see the new leader
+var lastLeader = leader()

Review Comment:
   It seems that we could just initialize with -1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2038064405

   @junrao thanks for reviews. I have removed the useless log and revise the 
test. let us see what happens. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1552264764


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -483,12 +484,13 @@ public int recover(ProducerStateManager 
producerStateManager, Optional maxTimestampSoFar()) {
 maxTimestampAndOffsetSoFar = new 
TimestampOffset(batch.maxTimestamp(), batch.lastOffset());
+System.out.println("[CHIA] recovery: " + 
maxTimestampAndOffsetSoFar);

Review Comment:
   Is this just for testing?



##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -189,14 +215,52 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   }
 
   private def verifyListOffsets(topic: String = topicName, 
expectedMaxTimestampOffset: Int = 1): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
-assertEquals(0, earliestOffset.offset())
+def check(): Unit = {
+  val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
+  assertEquals(0, earliestOffset.offset())
 
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic)
-assertEquals(3, latestOffset.offset())
+  val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), 
topic)
+  assertEquals(3, latestOffset.offset())
+
+  val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
+  assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+}
 
-val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
-assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+// case 0: test the offsets from leader's append path
+check()
+
+// case 1: test the offsets from follower's append path.
+// we make a follower be the new leader to handle the ListOffsetRequest
+def leader(): Int = 
adminClient.describeTopics(java.util.Collections.singletonList(topic))
+  .allTopicNames().get().get(topic).partitions().get(0).leader().id()
+
+val previousLeader = leader()
+val newLeader = brokers.map(_.config.brokerId).find(_ != 
previousLeader).get
+
+// change the leader to new one
+
adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new 
TopicPartition(topic, 0),
+  Optional.of(new 
NewPartitionReassignment(java.util.Arrays.asList(newLeader).all().get()
+// wait for all reassignments get completed
+waitForAllReassignmentsToComplete(adminClient)
+// make sure we are able to see the new leader
+TestUtils.waitUntilTrue(() => newLeader == leader(), s"expected leader: 
$newLeader but actual: ${leader()}")

Review Comment:
   The leader could change in the error message by calling leader() again. 
Could we save the last leader and use that in the error message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2037697246

   > 
(kafka.admin.ListOffsetsIntegrationTest.testThreeCompressedRecordsInSeparateBatch(String).quorum=kraft)
 failed with the following.
   
   I'm trying to reproduce it on my local :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1552022713


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -189,14 +220,49 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   }
 
   private def verifyListOffsets(topic: String = topicName, 
expectedMaxTimestampOffset: Int = 1): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
-assertEquals(0, earliestOffset.offset())
+def check(): Unit = {
+  val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
+  assertEquals(0, earliestOffset.offset())
 
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic)
-assertEquals(3, latestOffset.offset())
+  val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), 
topic)
+  assertEquals(3, latestOffset.offset())
+
+  val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
+  assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+}
 
-val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
-assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+// case 0: test the offsets from leader's append path
+check()
+
+// case 1: test the offsets from follower's append path.
+// we make a follower be the new leader to handle the ListOffsetRequest
+val partitionAssignment = 
adminClient.describeTopics(java.util.Collections.singletonList(topic))
+  .allTopicNames().get().get(topic).partitions().get(0)
+val newLeader = brokers.map(_.config.brokerId).find(_ != 
partitionAssignment.leader().id()).get
+
adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new 
TopicPartition(topic, 0),
+  Optional.of(new 
NewPartitionReassignment(java.util.Arrays.asList(newLeader).all().get()
+waitForAllReassignmentsToComplete(adminClient)
+TestUtils.waitUntilTrue(() => newLeader == 
adminClient.describeTopics(java.util.Collections.singletonList(topic))
+  .allTopicNames().get().get(topic).partitions().get(0).leader().id(), 
"expected leader: " + newLeader
+  + ", but actual leader: " + 
adminClient.describeTopics(java.util.Collections.singletonList(topic))
+  .allTopicNames().get().get(topic).partitions().get(0).leader().id())

Review Comment:
   This is a bit hard to read now. Could we do `adminClient.describeTopics` 
once and reuse the result?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-04 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2037601997

   merge trunk to trigger QA again. Also, the error seems happen due to 
unchanged leader. will check it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1550393556


##
clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java:
##
@@ -245,4 +246,23 @@ public interface RecordBatch extends Iterable {
  * @return Whether this is a batch containing control records
  */
 boolean isControlBatch();
+
+/**
+ * iterate all records to find the offset of max timestamp.
+ * noted:
+ * 1) that the earliest offset will return if there are multi records 
having same (max) timestamp
+ * 2) it always return -1 if the {@link RecordBatch#magic()} is equal to 
{@link RecordBatch#MAGIC_VALUE_V0}

Review Comment:
   return => returns



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1550271336


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -56,11 +60,38 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   @AfterEach
   override def tearDown(): Unit = {
-setOldMessageFormat = false
+version = RecordBatch.MAGIC_VALUE_V2
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListMaxTimestampWithEmptyLog(quorum: String): Unit = {
+val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topicName)
+assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, 
maxTimestampOffset.offset())
+assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
maxTimestampOffset.timestamp())
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testListVersion0(quorum: String): Unit = {

Review Comment:
   @junrao this is the new test case for the version 0 that we should get `-1` 
if the magic value is 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1550270774


##
clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java:
##
@@ -245,4 +246,23 @@ public interface RecordBatch extends Iterable {
  * @return Whether this is a batch containing control records
  */
 boolean isControlBatch();
+
+/**
+ * iterate all records to find the offset of max timestamp.
+ * noted:
+ * 1) that the earliest offset will return if there are multi records 
having same (max) timestamp
+ * 2) it always return -1 if the {@link RecordBatch#magic()} is equal to 
{@link RecordBatch#MAGIC_VALUE_V0}
+ * @return offset of max timestamp
+ */
+default Optional offsetOfMaxTimestamp() {
+if (magic() == RecordBatch.MAGIC_VALUE_V0) return Optional.empty();

Review Comment:
   @junrao the short-circuit is added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1550270383


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+latestTimestampSegment.log.batchesFrom(position.position).asScala

Review Comment:
   I have addressed the comment by 
https://github.com/apache/kafka/pull/15621/commits/4785371c54e2fc2c540895ffe2f94829449937e6



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2035323125

   > There were 69 test failures and quite a few of them related to ListOffset
   
   There is another PR (https://github.com/apache/kafka/pull/15489) encounters 
same error that listing offset return incorrect offset. I'm digging in it ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1550055646


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+latestTimestampSegment.log.batchesFrom(position.position).asScala

Review Comment:
   In the case of magic=0, we will find latestTimestampSegment with 
NO_TIMESTAMP. If we go through the rest of the logic, it seems that we will 
return the first offset instead of -1. Perhaps we should short-circuit if 
latestTimestampSegment is NO_TIMESTAMP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2034230452

   the failed tests pass on my local.
   ```
   ./gradlew cleanTest :streams:test --tests 
SlidingWindowedKStreamIntegrationTest.shouldRestoreAfterJoinRestart 
:storage:test --tests 
TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout 
:metadata:test --tests QuorumControllerTest.testFenceMultipleBrokers 
:trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated 
:connect:mirror:test --tests 
DedicatedMirrorIntegrationTest.testMultiNodeCluster --tests 
MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault --tests 
MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithFrequentOffsetSyncs
 --tests MirrorConnectorsIntegrationExactlyOnceTest.testSyncTopicConfigs 
:core:test --tests 
ListOffsetsIntegrationTest.testThreeRecordsInSeparateBatchHavingDifferentCompressionTypeWithServer
 --tests ListOffsetsIntegrationTest.testThreeNonCompressedRecordsInOneBatch 
--tests 
ListOffsetsIntegrationTest.testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer
 --tests ListOffsetsIntegrationTest.test
 ThreeCompressedRecordsInOneBatch --tests 
ListOffsetsIntegrationTest.testThreeCompressedRecordsInSeparateBatch --tests 
ListOffsetsIntegrationTest.testThreeNonCompressedRecordsInSeparateBatch --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithoutDescribeAclViaAssign
 --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests 
ConsumerBounceTest.testCloseDuringRebalance --tests 
LogDirFailureTest.testIOExceptionDuringLogRoll --tests 
LogDirFailureTest.testIOExceptionDuringCheckpoint :clients:test --tests 
KafkaAdminClientTest.testClientSideTimeoutAfterFailureToReceiveResponse
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548765182


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  // return the base offset for backward compatibility if there is no 
batches
+  .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
latestTimestampSegment.baseOffset(), lpc)))

Review Comment:
   sure. 
https://github.com/apache/kafka/pull/15621/commits/8a7ed30692bd070fb4160a6cbc76a868484529c3
 return none and add related test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548687661


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  // return the base offset for backward compatibility if there is no 
batches
+  .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
latestTimestampSegment.baseOffset(), lpc)))

Review Comment:
   Got it. If there is no timestamp index, we initialize 
`maxTimestampAndOffsetSoFar` to `TimestampOffset(RecordBatch.NO_TIMESTAMP, 
baseOffset())`. That's why it picks up the base offset. However, it doesn't 
seem intuitive for the user. Returning None seems better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548654826


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  // return the base offset for backward compatibility if there is no 
batches
+  .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
latestTimestampSegment.baseOffset(), lpc)))

Review Comment:
   If it is fine to change the behavior, we can just retune None to build the 
response with unknown offset/timestamp.
   
   
https://github.com/apache/kafka/blob/ee61bb721eecb0404929f125fe43392f3d024453/core/src/main/scala/kafka/server/KafkaApis.scala#L1146



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548644261


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  // return the base offset for backward compatibility if there is no 
batches
+  .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
latestTimestampSegment.baseOffset(), lpc)))

Review Comment:
   I am ok to return -1. However, it seems that we return base offset before 
when we do not find the max timestamp (no batch exists). Hence, the main reason 
of returning base offset is backward compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548633965


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  // return the base offset for backward compatibility if there is no 
batches
+  .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
latestTimestampSegment.baseOffset(), lpc)))

Review Comment:
   Hmm, still not sure about this. If we can't find the maxTimestamp, 
intuitively, it seems that we should return -1 for both timestamp and offset?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548530489


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  .getOrElse(new TimestampAndOffset(-1, 0, lpc))) // always return 
something for backward compatibility

Review Comment:
   > why do we need to return offset=0 when we can't find the maxTimestamp?
   
   oh, my bad. we should return base offset instead of zero. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548487542


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  .getOrElse(new TimestampAndOffset(-1, 0, lpc))) // always return 
something for backward compatibility

Review Comment:
   Hmm, why do we need to return offset=0 when we can't find the maxTimestamp?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548438900


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  .getOrElse(new TimestampAndOffset(-1, 0, lpc))) // always return 
something for backward compatibility

Review Comment:
   @showuon your previous comment is right (sorry that I can't find the comment 
but it is in my mind). We need to return `offset=0 and ts=-1` if there are no 
batches for the sake of backward compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548427028


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,54 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
- *
- * @return The max timestamp and its offset
+ * There are three cases of finding max timestamp to return:
+ * 1) version 0: The max timestamp is NO_TIMESTAMP (-1)
+ * 2) LogAppendTime: All records have same timestamp, and so the max 
timestamp is equal to logAppendTime
+ * 3) CreateTime: The max timestamp of record
+ * 
+ * Let's talk about OffsetOfMaxTimestamp. There are some paths that we 
don't try to find the OffsetOfMaxTimestamp
+ * to avoid expensive records iteration. Those paths include follower 
append and index recovery. In order to
+ * avoid inconsistent time index, we let all paths find 
shallowOffsetOfMaxTimestamp instead of OffsetOfMaxTimestamp.
+ * 
+ * Let's define the shallowOffsetOfMaxTimestamp: It is last offset of the 
batch having max timestamp. If there are
+ * many batches having same max timestamp, we pick up the earliest batch.
+ * 
+ * There are five cases of finding shallowOffsetOfMaxTimestamp to return:
+ * 1) version 0: It is always the -1
+ * 2) LogAppendTime with single batch: It is the offset of last record
+ * 3) LogAppendTime with many single-record batches: Those single-record 
batches have same max timestamp, so the
+ *   base offset is equal 
with the last offset of earliest batch

Review Comment:
   so the base offset is equal with the last offset of earliest batch => so we 
return the base offset, which is equal to the last offset of earliest batch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548390558


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,54 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
- *
- * @return The max timestamp and its offset
+ * There are three cases of finding max timestamp to return:

Review Comment:
   hi @junrao I rewrite whole comments to list all cases. please take a look at 
it, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r154830


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,40 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
+ * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader 
append, follower append, and index recovery)
+ * The definition of shallowOffsetOfMaxTimestamp is the last offset of the 
batch having max timestamp.
+ * If there are many batches having same max timestamp, we pick up the 
earliest batch.
+ * 
+ * If the log append time is used, the offset will be the last offset 
unless no compression is used and
+ * the message format version is 0 or 1, in which case, it will be -1.

Review Comment:
   Still not very accurate. For message format version 0, the offset will be 
-1. For message format version 1, the offset will be the first offset.



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,40 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
+ * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader 
append, follower append, and index recovery)
+ * The definition of shallowOffsetOfMaxTimestamp is the last offset of the 
batch having max timestamp.
+ * If there are many batches having same max timestamp, we pick up the 
earliest batch.
+ * 
+ * If the log append time is used, the offset will be the last offset 
unless no compression is used and
+ * the message format version is 0 or 1, in which case, it will be -1.
+ * 
+ * If create time is used, the offset will be the last offset unless no 
compression is used and the message
+ * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.

Review Comment:
   For message format version 0, the offset will be -1. For message format 
version 1, the offset will be the offset of the record with the max timestamp.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2032614122

   @junrao thanks for additional reviews. I have addressed them except 
https://github.com/apache/kafka/pull/15621#discussion_r1548230600


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548258165


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -434,7 +442,8 @@ public ValidationResult 
validateMessagesAndAssignOffsetsCompressed(LongRef offse
 now,
 records,
 maxTimestamp,
-offsetOfMaxTimestamp,
+// there is only one batch in this path, so last offset can be 
viewed as shallowOffsetOfMaxTimestamp
+lastOffset,

Review Comment:
   If the magic is 0, we don't reach this path. Instead, we run 
`buildRecordsAndAssignOffsets` to handle the version 0. see 
https://github.com/apache/kafka/blob/ee61bb721eecb0404929f125fe43392f3d024453/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L343



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548211287


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,40 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
+ * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader 
append, follower append, and index recovery)
+ * The definition of shallowOffsetOfMaxTimestamp is the last offset of the 
batch which having max timestamp.

Review Comment:
   which having => having



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,14 +293,29 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
+// those checks should be equal to MemoryRecordsBuilder#info
+switch (toMagic) {
+case RecordBatch.MAGIC_VALUE_V0:
+// value will be the default value: -1
+shallowOffsetOfMaxTimestamp = -1;

Review Comment:
   maxTimestamp should be NO_TIMESTAMP if magic is 0.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -434,7 +442,8 @@ public ValidationResult 
validateMessagesAndAssignOffsetsCompressed(LongRef offse
 now,
 records,
 maxTimestamp,
-offsetOfMaxTimestamp,
+// there is only one batch in this path, so last offset can be 
viewed as shallowOffsetOfMaxTimestamp
+lastOffset,

Review Comment:
   If magic is 0, we should set both maxTimestamp and 
shallowOffsetOfMaxTimestamp to -1.



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,40 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
+ * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader 
append, follower append, and index recovery)
+ * The definition of shallowOffsetOfMaxTimestamp is the last offset of the 
batch which having max timestamp.
+ * If there are many batches having same max timestamp, we pick up the 
earliest batch.
+ * 
+ * If the log append time is used, the offset will be the last offset 
unless no compression is used and
+ * the message format version is 0 or 1, in which case, it will be the 
first offset.

Review Comment:
   For message format 0, offset is always -1. Ditto below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-01 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2030856554

   @junrao thanks for reviews. I have addressed all comments, and add new test 
`testLogAppendTimeNonCompressedV0` to cover v0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-01 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1546994725


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -275,7 +278,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef 
offsetCounter,
 
 if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && 
maxBatchTimestamp > maxTimestamp) {
 maxTimestamp = maxBatchTimestamp;
-offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp;
+shallowOffsetOfMaxTimestamp = offsetOfMaxBatchTimestamp;

Review Comment:
   > Do we have test cases covering that?
   
   There is a UT already, and I will add more tests to cover it.
   
   
https://github.com/apache/kafka/blob/cc6b919212ae62d75850214ae2c93379b78ff325/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala#L408



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-01 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1546966401


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java:
##
@@ -259,7 +259,7 @@ public String toString() {
 ", lastOffset=" + lastOffset +
 ", lastLeaderEpoch=" + lastLeaderEpoch +
 ", maxTimestamp=" + maxTimestamp +
-", offsetOfMaxTimestamp=" + offsetOfMaxTimestamp +
+", offsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp +

Review Comment:
   offsetOfMaxTimestamp => shallowOffsetOfMaxTimestamp



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -68,17 +68,20 @@ public static class ValidationResult {
 public final long logAppendTimeMs;
 public final MemoryRecords validatedRecords;
 public final long maxTimestampMs;
-public final long offsetOfMaxTimestampMs;
+// we only maintain batch level offset for max timestamp since we want 
to align the behavior of updating time
+// indexing entries. The paths of follower sync and replica recovery 
do not iterate all records, so they have no

Review Comment:
   follower sync => follower append



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -275,7 +278,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef 
offsetCounter,
 
 if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && 
maxBatchTimestamp > maxTimestamp) {
 maxTimestamp = maxBatchTimestamp;
-offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp;
+shallowOffsetOfMaxTimestamp = offsetOfMaxBatchTimestamp;

Review Comment:
   We need to set shallowOffsetOfMaxTimestamp to the last offset in the batch. 
Do we have test cases covering that?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java:
##
@@ -117,7 +117,7 @@ public LogAppendInfo(long firstOffset,
 this.lastOffset = lastOffset;
 this.lastLeaderEpoch = lastLeaderEpoch;
 this.maxTimestamp = maxTimestamp;
-this.offsetOfMaxTimestamp = offsetOfMaxTimestamp;
+this.shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;

Review Comment:
   Could we rename offsetOfMaxTimestamp in the input param and the javadoc 
accordingly? It would be useful to rename the local val in 
`UnifiedLog.analyzeAndValidateRecords` too.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,14 +296,20 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
+// case 0: there is only one batch so use the last offset
+shallowOffsetOfMaxTimestamp = offsetCounter.value - 1;
+else
+// case 1: Those single-record batches have same max 
timestamp, so the initial offset is equal with
+// the last offset of earliest batch
+shallowOffsetOfMaxTimestamp = initialOffset;

Review Comment:
   For MAGIC_VALUE_V0, shallowOffsetOfMaxTimestamp should be -1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-01 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2030454105

   @junrao thanks for all your reviews and patience. all comments are addressed 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-01 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1546692495


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -232,17 +233,17 @@ private boolean canConvertToRelativeOffset(long offset) 
throws IOException {
  *
  * @param largestOffset The last offset in the message set
  * @param largestTimestampMs The largest timestamp in the message set.
- * @param offsetOfMaxTimestamp The offset of the message that has the 
largest timestamp in the messages to append.
+ * @param shallowOffsetOfMaxTimestamp The offset of the message that has 
the largest timestamp in the messages to append.

Review Comment:
   Could we make the comment clear that the last offset is in the batch?



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,39 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
+ * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader 
append, follower append, and index recovery)
+ * The definition of shallowOffsetOfMaxTimestamp is the last offset of the 
batch which having max timestamp.
+ * If there are many batches having same max timestamp, we pick up the 
earliest batch.
+ * 
+ * If the log append time is used, the offset will be the last offset 
unless no compression is used and
+ * the message format version is 0 or 1, in which case, it will be the 
first offset.
+ * 
+ * If create time is used, the offset will be the last offset unless no 
compression is used and the message
+ * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.
  *
  * @return The max timestamp and its offset
  */
 public RecordsInfo info() {
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
-return new RecordsInfo(logAppendTime, baseOffset);
+if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
+// case 0: there is only one batch so use the last offset
+return new RecordsInfo(logAppendTime, lastOffset);
+else
+// case 1: there are many single-record batches having same 
max timestamp, so the base offset is
+// equal with the last offset of earliest batch
+return new RecordsInfo(logAppendTime, baseOffset);
+} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
+return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
-// If it's MAGIC_VALUE_V0, the value will be the default value: 
[-1, -1]
-return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);
+if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
+// ditto to case 0
+return new RecordsInfo(maxTimestamp, lastOffset);
+else
+// case 2: Each batch is composed of single record, and 
offsetOfMaxTimestamp points to the record having
+// max timestamp. Hence, offsetOfMaxTimestamp is equal to the 
last offset of earliest batch (record)

Review Comment:
   of earliest batch => of earliest batch with max timestamp?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,14 +292,13 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
 }
 
 return new ValidationResult(
 now,
 records,
 maxTimestamp,
-offsetOfMaxTimestamp,
+shallowOffsetOfMaxTimestamp,

Review Comment:
   We need to add the following code back, right?
   
   ```
   if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
   offsetOfMaxTimestamp = offsetCounter.value - 1;
   }
   
   ```



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,39 @@ public MemoryRecords build() {
  

Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-31 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2029213022

   rebase code and apply Luke's patch from 
https://github.com/chia7712/kafka/pull/3/files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-29 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2027904152

   >  I think both are important. First, it's important to be able to derive 
the same thing consistently from the leader and the follower log. This affects 
things like the time indexing entries. It will be confusing if the leader adds 
an offset in the middle of a batch while the follower adds an offset at the end 
of the batch. Second, it's important to name things as accurately as possible. 
Otherwise, future developers could make inaccurate assumptions.
   
   you are right. I have reverted the impl and naming. Also, I add extra 
comments for the "spec" of offsetOfMaxTimestamp


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-29 Thread via GitHub


junrao commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2027526301

   > Do we need to revert all of them? the paths we had fixed works well now.
   > 
   > 1. It seems to me adding comments for both "recover" and "follower" cases 
can remind readers that this offsetOfMaxTimestampMs is shallow.
   > 2. or we can only rename offsetForMaxTimestamp back to 
shallowOffsetMaxTimestamp but we keep the implementation.
   > 
   
   @chia7712 : I think both are important. First, it's important to be able to 
derive the same thing consistently from the leader and the follower log. This 
affects things like the time indexing entries. It will be confusing if the 
leader adds an offset in the middle of a batch while the follower adds an 
offset at the end of the batch. Second, it's our responsibility name things as 
accurately as possible. Otherwise, future developers could make inaccurate 
assumptions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-29 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1544283639


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -189,14 +190,47 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   }
 
   private def verifyListOffsets(topic: String = topicName, 
expectedMaxTimestampOffset: Int = 1): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
-assertEquals(0, earliestOffset.offset())
+def check(): Unit = {
+  val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
+  assertEquals(0, earliestOffset.offset())
 
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic)
-assertEquals(3, latestOffset.offset())
+  val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), 
topic)
+  assertEquals(3, latestOffset.offset())
 
-val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
-assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+  val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
+  assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+}
+
+// case 0: test the offsets from leader's append path
+check()
+
+// case 1: test the offsets from follower's append path.

Review Comment:
   @junrao the extra tests are added. please take a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2026641230

   > (1) revert all offsetForMaxTimestamp to shallowOffsetMaxTimestamp
   (2) change/revert the implementation to set shallowOffsetMaxTimestamp 
accordingly.
   
   Do we need to revert all of them? the paths we had fixed works well now. 
   1) It seems to me adding comments for both "recover" and "follower" cases 
can remind readers that this `offsetOfMaxTimestampMs` is shallow.
   2) or we can only rename `offsetForMaxTimestamp` back to 
`shallowOffsetMaxTimestamp` but we keep the implementation.
   
   @junrao WDYT?
   
   > (3) add tests for follower appends
   
   will complete it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1544046500


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp)

Review Comment:
   >We could stop after finding the first batch matching maxTimestamp.
   
   oh, sorry for neglect that.
   
   > Although it should be impossible, should we handle empty() case?
   
   just let `KafkaApis` return error response with `UNKNOWN`
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1146



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


showuon commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1543916425


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp)

Review Comment:
   Although it should be impossible, should we handle `empty()` case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


showuon commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1543906755


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp)

Review Comment:
   Good suggestion. We should use `find` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1543754297


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp)

Review Comment:
   We could stop after finding the first batch matching maxTimestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 opened a new pull request, #15621:
URL: https://github.com/apache/kafka/pull/15621

   We do iterate the records to find the `offsetOfMaxTimestamp` instead of 
returning the cached one when handling `ListOffsetsRequest.MAX_TIMESTAMP`, 
since it is hard to align all paths to get correct `offsetOfMaxTimestamp`. The 
known paths are shown below.
   
   1.  `convertAndAssignOffsetsNonCompressed` -> we CAN get correct 
`offsetOfMaxTimestamp` when validating all records
   2. `assignOffsetsNonCompressed` -> ditto
   3. `validateMessagesAndAssignOffsetsCompressed` -> ditto
   4. `validateMessagesAndAssignOffsetsCompressed#buildRecordsAndAssignOffsets` 
-> ditto
   5. `appendAsFollow#append#analyzeAndValidateRecords` -> we CAN'T get correct 
`offsetOfMaxTimestamp` as iterating all records is expensive when fetching 
records from leader
   6. `LogSegment#recover` -> ditto
   
   https://issues.apache.org/jira/browse/KAFKA-16310
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on PR #15618:
URL: https://github.com/apache/kafka/pull/15618#issuecomment-2026013673

   > I am just saying that we only need to fix this in trunk since the 
implementation was never correct in any previous branches, thus not a 
regression.
   
   got it. will open another PR for trunk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 closed pull request #15618: KAFKA-16310 ListOffsets doesn't report the 
offset with maxTimestamp a…
URL: https://github.com/apache/kafka/pull/15618


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on PR #15618:
URL: https://github.com/apache/kafka/pull/15618#issuecomment-2026000277

   > I am not sure I understand this. All we need for this solution (or 
workaround) is the "max timestamp" of a segment, since we always iterate the 
batches (from the segment having the max timestamp) to find the "offset" of max 
timestamp when handling the ListOffsetsRequest.MAX_TIMESTAMP. Hence, we can 
correct the implement for all active branches (include 3.6.3) by this PR.
   
   Yes, I agree that we can fix this issue completely. I am just saying that we 
only need to fix this in trunk since the implementation was never correct in 
any previous branches, thus not a regression.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on code in PR #15618:
URL: https://github.com/apache/kafka/pull/15618#discussion_r1543560539


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
 val segmentsCopy = logSegments.toBuffer
 val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.maxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
+val batch = 
latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp())

Review Comment:
   Yes, suppose you have a 1GB segment and the maxTimestamp is in the last 
batch. latestTimestampSegment.log.batches() needs to read 1GB from disk. Using 
the offsetIndex, we only need to read the index and the index.interval (default 
to 4KB) worth of bytes.
   
   > Is the impl of lookup like this?
   
   Yes, that's what I was thinking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on PR #15618:
URL: https://github.com/apache/kafka/pull/15618#issuecomment-2025943523

   > Since the follower only maintains offsetForMaxTimestamp at the batch 
level, the listMaxTimestamp API was never implemented correctly.
   
   I am not sure I understand this. All we need for this solution (or 
workaround) is the "max timestamp" of a segment, since we always iterate the 
batches (from the segment having the max timestamp) to find the "offset" of max 
timestamp when handling the `ListOffsetsRequest.MAX_TIMESTAMP`. Hence, we can 
correct the implement for all active branches (include 3.6.3) by this PR.
   
   > So, technically, there was no regression for listMaxTimestamp. It seems 
there is no need to fix this in the 3.6? We could just fix it in trunk.
   
   BTW, I'm ok to keep the behavior for 3.6 as it is not a kind of "regression".
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on code in PR #15618:
URL: https://github.com/apache/kafka/pull/15618#discussion_r1543505409


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
 val segmentsCopy = logSegments.toBuffer
 val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.maxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
+val batch = 
latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp())

Review Comment:
   > latestTimestampSegment.log.batches() scans the whole log segment and could 
introduce unnecessary extra I/O. So, there could be performance degradation 
because of that.
   
   The `batches` is a `iterable` object, and its implementation load the batch 
only if we call `next`. 
https://github.com/apache/kafka/blob/3.6/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java#L63
   
   Hence, the benefit of looking up for a batch (find the position and then use 
it to call `batchesFrom`) is that we can save some I/O by skipping some 
batches. Please correct me if I misunderstand anything.
   
   > I am not sure I understand this. Looking up for a batch with each 
baseOffset or lastOffset will locate the same batch using the offset index, 
right?
   
   Is the impl of lookup like this?
   ```scala
   val position = 
latestTimestampSegment.offsetIndex.lookup(latestTimestampSegment.offsetOfMaxTimestampSoFar)
   latestTimestampSegment.log.batchesFrom(position.position).asScala
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on PR #15618:
URL: https://github.com/apache/kafka/pull/15618#issuecomment-2025853565

   @chia7712: Since the follower only maintains offsetForMaxTimestamp at the 
batch level, the listMaxTimestamp API was never implemented correctly. So, 
technically, there was no regression for listMaxTimestamp. It seems there is no 
need to fix this in the 3.6? We could just fix it in trunk. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on code in PR #15618:
URL: https://github.com/apache/kafka/pull/15618#discussion_r1543430656


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
 val segmentsCopy = logSegments.toBuffer
 val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.maxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
+val batch = 
latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp())

Review Comment:
   @chia7712 :` latestTimestampSegment.log.batches()` scans the whole log 
segment and could introduce unnecessary extra I/O. So, there could be 
performance degradation because of that. 
   
   > Hence we have to use condition baseOffset <= offset <= lastOffset to find 
batch.
   
   I am not sure I understand this. Looking up for a batch with each baseOffset 
or lastOffset will locate the same batch using the offset index, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on code in PR #15618:
URL: https://github.com/apache/kafka/pull/15618#discussion_r1543390343


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
 val segmentsCopy = logSegments.toBuffer
 val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.maxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
+val batch = 
latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp())

Review Comment:
   @junrao  thanks for feedback. I use the max timestamp to find the "first" 
batch instead of using offset index. 
   
   It seems to me using max timestamp is more simple since the offset stored by 
`maxTimestampAndOffsetSoFar` could be either the last offset or offset of max 
timestamp. Hence we have to use condition `baseOffset <= offset <= lastOffset` 
to find batch.
   
   I'm ok to use offset if using max timestamp to find first batch have any 
side effect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on code in PR #15618:
URL: https://github.com/apache/kafka/pull/15618#discussion_r1543349555


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
 val segmentsCopy = logSegments.toBuffer
 val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.maxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
+val batch = 
latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp())

Review Comment:
   Hmm, iterating all batches can be expensive. We could use the offset index 
to find the batch containing the offset in maxTimestampAndOffsetSoFar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 opened a new pull request, #15618:
URL: https://github.com/apache/kafka/pull/15618

   - add default implementation to Batch to return offset of max timestamp
   - copy ListOffsetsIntegrationTest from trunk branch
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310: ListOffsets doesn't report the offset with maxTimestamp anymore [kafka]

2024-03-04 Thread via GitHub


KevinZTW commented on PR #15461:
URL: https://github.com/apache/kafka/pull/15461#issuecomment-1976256564

   @showuon much thanks for helping me look into this! Base on the information 
on the Jira ticket, It seems that there are other developers already found the 
root cause and going to submit a fix. Sorry I forgot to close this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310: ListOffsets doesn't report the offset with maxTimestamp anymore [kafka]

2024-03-04 Thread via GitHub


KevinZTW closed pull request #15461: KAFKA-16310: ListOffsets doesn't report 
the offset with maxTimestamp anymore
URL: https://github.com/apache/kafka/pull/15461


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310: ListOffsets doesn't report the offset with maxTimestamp anymore [kafka]

2024-03-04 Thread via GitHub


showuon commented on PR #15461:
URL: https://github.com/apache/kafka/pull/15461#issuecomment-1976165637

   This is not an easy one. I need more time to dig into it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310: ListOffsets doesn't report the offset with maxTimestamp anymore [kafka]

2024-03-03 Thread via GitHub


showuon commented on PR #15461:
URL: https://github.com/apache/kafka/pull/15461#issuecomment-1975543018

   Will check it today. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16310: ListOffsets doesn't report the offset with maxTimestamp anymore [kafka]

2024-03-03 Thread via GitHub


KevinZTW opened a new pull request, #15461:
URL: https://github.com/apache/kafka/pull/15461

   When add the message with timestamp through producer, get offset with max 
timestamp didn't get the expected offset
   
   jira ticket: https://issues.apache.org/jira/browse/KAFKA-16310
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org