junrao commented on code in PR #17193:
URL: https://github.com/apache/kafka/pull/17193#discussion_r1772299474


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -623,8 +623,12 @@ private[log] class Cleaner(val id: Int,
 
     val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), 
log.config.segmentSize,
       log.config.maxIndexSize, cleanable.firstUncleanableOffset)
-    for (group <- groupedSegments)
-      cleanSegments(log, group, offsetMap, currentTime, stats, 
transactionMetadata, legacyDeleteHorizonMs)
+    val groupIter = groupedSegments.iterator
+    while (groupIter.hasNext) {
+      val group = groupIter.next
+      val retainLastBatch = !groupIter.hasNext
+      cleanSegments(log, group, offsetMap, currentTime, stats, 
transactionMetadata, legacyDeleteHorizonMs, retainLastBatch)

Review Comment:
   It's probably simpler to just pass `upperBoundOffset`  to `cleanInto` and 
check if it equals to a batch's nextOffset.



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1358,10 +1358,39 @@ class LogCleanerTest extends Logging {
     keys.foreach(k => map.put(key(k), Long.MaxValue))
     assertThrows(classOf[LogCleaningAbortedException], () =>
       cleaner.cleanSegments(log, log.logSegments.asScala.take(3).toSeq, map, 
0L, new CleanerStats(),
-        new CleanedTransactionMetadata, -1)
+        new CleanedTransactionMetadata, -1, retainLastBatch = true)
     )
   }
 
+  @Test
+  def testCleanSegmentsRetainingLastEmptyBatch(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    // append messages to the log until we have four segments
+    while (log.numberOfSegments < 4)
+      log.appendAsLeader(record(log.logEndOffset.toInt, 
log.logEndOffset.toInt), leaderEpoch = 0)
+    val keysFound = LogTestUtils.keysInLog(log)
+    assertEquals(0L until log.logEndOffset, keysFound)
+
+    // pretend all keys are deleted
+    val map = new FakeOffsetMap(Int.MaxValue)
+    keysFound.foreach(k => map.put(key(k), Long.MaxValue))
+
+    // clean the log
+    val segments = log.logSegments.asScala.take(3).toSeq
+    val stats = new CleanerStats()
+    cleaner.cleanSegments(log, segments, map, 0L, stats, new 
CleanedTransactionMetadata, -1, retainLastBatch = true)
+    assertEquals(2, log.logSegments.size)
+    assertEquals(1, log.logSegments.asScala.head.log.batches.asScala.size, 
"one batch should be retained in the cleaned segment")
+    val retainedBatch = log.logSegments.asScala.head.log.batches.asScala.head
+    assertEquals(log.logSegments.asScala.last.baseOffset - 1, 
retainedBatch.lastOffset, "the retained batch should be the last batch")
+    assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 
retainedBatch.sizeInBytes, "the retained batch should be an empty batch")

Review Comment:
   To test if a batch is empty, it's probably better to just iterate the batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to