showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1611046740


##########
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##########
@@ -144,6 +144,35 @@ class LogSegmentTest {
     checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
+  @Test
+  def testReadWhenNoMaxPosition(): Unit = {
+    val maxPosition: Optional[java.lang.Long] = Optional.empty()
+    val maxSize = 1
+    val seg = createSegment(40)
+    val ms = records(50, "hello", "there")
+    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    for (minOneMessage <- Array(true, false)) {

Review Comment:
   Could we use `@ParameterizedTest` to test it? Otherwise, if the test fail, 
it's not clear which case causing the failure.
   ```
     @ParameterizedTest
     @ValueSource(booleans = Array(true, false))
   ```



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -4211,6 +4211,46 @@ class UnifiedLogTest {
     assertEquals(31, log.localLogStartOffset())
   }
 
+  @Test
+  def testConvertToOffsetMetadataDoesNotThrowOffsetOutOfRangeError(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, 
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+    var offset = 0L
+    for(_ <- 0 until 50) {
+      val records = TestUtils.singletonRecords("test".getBytes())
+      val info = log.appendAsLeader(records, leaderEpoch = 0)
+      offset = info.lastOffset
+      if (offset != 0 && offset % 10 == 0)
+        log.roll()
+    }
+    assertEquals(5, log.logSegments.size)
+    log.updateHighWatermark(log.logEndOffset)
+    // simulate calls to upload 3 segments to remote storage
+    log.updateHighestOffsetInRemoteStorage(30)
+
+    log.deleteOldSegments()
+    assertEquals(2, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(31, log.localLogStartOffset())
+
+    log.updateLogStartOffsetFromRemoteTier(15)
+    assertEquals(15, log.logStartOffset)
+
+    // case-1: offset is higher than the local-log-start-offset.
+    // log-start-offset < local-log-start-offset < offset-to-be-converted < 
log-end-offset
+    assertEquals(new LogOffsetMetadata(35, 31, 288), 
log.maybeConvertToOffsetMetadata(35))
+    // case-2: offset is lesser than the local-log-start-offset

Review Comment:
   less than



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -4211,6 +4211,46 @@ class UnifiedLogTest {
     assertEquals(31, log.localLogStartOffset())
   }
 
+  @Test
+  def testConvertToOffsetMetadataDoesNotThrowOffsetOutOfRangeError(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, 
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+    var offset = 0L
+    for(_ <- 0 until 50) {
+      val records = TestUtils.singletonRecords("test".getBytes())
+      val info = log.appendAsLeader(records, leaderEpoch = 0)
+      offset = info.lastOffset
+      if (offset != 0 && offset % 10 == 0)
+        log.roll()
+    }
+    assertEquals(5, log.logSegments.size)
+    log.updateHighWatermark(log.logEndOffset)
+    // simulate calls to upload 3 segments to remote storage
+    log.updateHighestOffsetInRemoteStorage(30)
+
+    log.deleteOldSegments()
+    assertEquals(2, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(31, log.localLogStartOffset())
+
+    log.updateLogStartOffsetFromRemoteTier(15)
+    assertEquals(15, log.logStartOffset)
+
+    // case-1: offset is higher than the local-log-start-offset.
+    // log-start-offset < local-log-start-offset < offset-to-be-converted < 
log-end-offset
+    assertEquals(new LogOffsetMetadata(35, 31, 288), 
log.maybeConvertToOffsetMetadata(35))
+    // case-2: offset is lesser than the local-log-start-offset
+    // log-start-offset < offset-to-be-converted < local-log-start-offset < 
log-end-offset
+    assertEquals(new LogOffsetMetadata(29, -1L, -1), 
log.maybeConvertToOffsetMetadata(29))
+    // case-3: offset is higher than the log-end-offset
+    // log-start-offset < local-log-start-offset < log-end-offset < 
offset-to-be-converted
+    assertEquals(new LogOffsetMetadata(log.logEndOffset + 1, -1L, -1), 
log.maybeConvertToOffsetMetadata(log.logEndOffset + 1))
+    // case-4: offset is lesser than the log-start-offset

Review Comment:
   nit: lesser than -> less than



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##########
@@ -51,23 +50,24 @@ public LogOffsetMetadata(long messageOffset,
 
     // check if this offset is already on an older segment compared with the 
given offset
     public boolean onOlderSegment(LogOffsetMetadata that) {
-        if (messageOffsetOnly())
-            throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
-
+        if (messageOffsetOnly() || that.messageOffsetOnly())
+            return false;
         return this.segmentBaseOffset < that.segmentBaseOffset;
     }
 
     // check if this offset is on the same segment with the given offset
-    private boolean onSameSegment(LogOffsetMetadata that) {
+    public boolean onSameSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly() || that.messageOffsetOnly())
+            return false;
         return this.segmentBaseOffset == that.segmentBaseOffset;
     }
 
     // compute the number of bytes between this offset to the given offset
     // if they are on the same segment and this offset precedes the given 
offset
     public int positionDiff(LogOffsetMetadata that) {
-        if (messageOffsetOnly())
+        if (messageOffsetOnly() || that.messageOffsetOnly())
             throw new KafkaException(this + " cannot compare its segment 
position with " + that + " since it only has message offset info");
-        if (!onSameSegment(that))
+        if (this.segmentBaseOffset != that.segmentBaseOffset)

Review Comment:
   nit: Why can't we use `!onSameSegment(that)` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to