kamalcph commented on code in PR #14766:
URL: https://github.com/apache/kafka/pull/14766#discussion_r1395379325


##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3911,6 +3911,121 @@ class UnifiedLogTest {
     log.appendAsLeader(transactionalRecords, leaderEpoch = 0, 
verificationGuard = verificationGuard)
   }
 
+  @Test
+  def testDeletableSegmentsFilter(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+    for (_ <- 0 to 8) {
+      val records = TestUtils.records(List(
+        new SimpleRecord(mockTime.milliseconds, "a".getBytes),
+      ))
+      log.appendAsLeader(records, leaderEpoch = 0)
+      log.roll()
+    }
+    log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
+
+    assertEquals(10, log.logSegments.size())
+
+    {
+      val deletable = log.deletableSegments(
+        (segment: LogSegment, _: Option[LogSegment]) => segment.baseOffset <= 
5)
+      val expected = log.nonActiveLogSegmentsFrom(0L).asScala.filter(segment 
=> segment.baseOffset <= 5).toList
+      assertEquals(6, expected.length)
+      assertEquals(expected, deletable.toList)
+    }
+
+    {
+      val deletable = log.deletableSegments((_: LogSegment, _: 
Option[LogSegment]) => true)
+      val expected = log.nonActiveLogSegmentsFrom(0L).asScala.toList
+      assertEquals(9, expected.length)
+      assertEquals(expected, deletable.toList)
+    }
+
+    {
+      val records = TestUtils.records(List(
+        new SimpleRecord(mockTime.milliseconds, "a".getBytes),
+      ))
+      log.appendAsLeader(records, leaderEpoch = 0)
+      log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
+      val deletable = log.deletableSegments((_: LogSegment, _: 
Option[LogSegment]) => true)
+      val expected = log.logSegments.asScala.toList
+      assertEquals(10, expected.length)
+      assertEquals(expected, deletable.toList)
+    }
+  }
+
+  @Test
+  def testDeletableSegmentsIteration(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+    for (_ <- 0 to 8) {
+      val records = TestUtils.records(List(
+        new SimpleRecord(mockTime.milliseconds, "a".getBytes),
+      ))
+      log.appendAsLeader(records, leaderEpoch = 0)
+      log.roll()
+    }
+    log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
+
+    assertEquals(10, log.logSegments.size())
+
+    var offset = 0
+    val deletableSegments = log.deletableSegments(
+      (segment: LogSegment, nextSegmentOpt: Option[LogSegment]) => {
+        assertEquals(offset, segment.baseOffset)
+        val logSegments = new LogSegments(log.topicPartition)
+        log.logSegments.forEach(segment => logSegments.add(segment))
+        val floorSegmentOpt = logSegments.floorSegment(offset)
+        assertTrue(floorSegmentOpt.isPresent)
+        assertEquals(floorSegmentOpt.get, segment)
+        if (offset == log.logEndOffset) {
+          assertFalse(nextSegmentOpt.isDefined)
+        } else {
+          assertTrue(nextSegmentOpt.isDefined)
+          val higherSegmentOpt = logSegments.higherSegment(segment.baseOffset)
+          assertTrue(higherSegmentOpt.isPresent)
+          assertEquals(segment.baseOffset + 1, higherSegmentOpt.get.baseOffset)
+          assertEquals(higherSegmentOpt.get, nextSegmentOpt.get)
+        }
+        offset += 1
+        true
+      })
+    assertEquals(10L, log.logSegments.size())
+    assertEquals(log.nonActiveLogSegmentsFrom(0L).asScala.toSeq, 
deletableSegments.toSeq)
+  }
+
+  @Test
+  def testActiveSegmentDeletionDueToRetentionTimeBreachWithRemoteStorage(): 
Unit = {
+    val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1, 
segmentIndexBytes = 12,
+      retentionMs = 3, localRetentionMs = 1, fileDeleteDelayMs = 0, 
remoteLogStorageEnable = true)

Review Comment:
   `localRetentionMs` is set to 1 ms, once it passed the active segment will be 
rolled over and offloaded to remote storage.
   
   For `localRetentionBytes`, the 
[UnifiedLog#deleteRetentionSizeBreachedSegments](https://sourcegraph.com/github.com/apache/kafka/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L1502-1513)
 skips the active segment from the size calculation so we cannot assert based 
on it and we don't want to change the _deleteRetentionSizeBreachedSegments_ 
behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to