junrao commented on code in PR #17193: URL: https://github.com/apache/kafka/pull/17193#discussion_r1772299474
########## core/src/main/scala/kafka/log/LogCleaner.scala: ########## @@ -623,8 +623,12 @@ private[log] class Cleaner(val id: Int, val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset) - for (group <- groupedSegments) - cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs) + val groupIter = groupedSegments.iterator + while (groupIter.hasNext) { + val group = groupIter.next + val retainLastBatch = !groupIter.hasNext + cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs, retainLastBatch) Review Comment: It's probably simpler to just pass `upperBoundOffset` to `cleanInto` and check if it equals to a batch's nextOffset. ########## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ########## @@ -1358,10 +1358,39 @@ class LogCleanerTest extends Logging { keys.foreach(k => map.put(key(k), Long.MaxValue)) assertThrows(classOf[LogCleaningAbortedException], () => cleaner.cleanSegments(log, log.logSegments.asScala.take(3).toSeq, map, 0L, new CleanerStats(), - new CleanedTransactionMetadata, -1) + new CleanedTransactionMetadata, -1, retainLastBatch = true) ) } + @Test + def testCleanSegmentsRetainingLastEmptyBatch(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + // append messages to the log until we have four segments + while (log.numberOfSegments < 4) + log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0) + val keysFound = LogTestUtils.keysInLog(log) + assertEquals(0L until log.logEndOffset, keysFound) + + // pretend all keys are deleted + val map = new FakeOffsetMap(Int.MaxValue) + keysFound.foreach(k => map.put(key(k), Long.MaxValue)) + + // clean the log + val segments = log.logSegments.asScala.take(3).toSeq + val stats = new CleanerStats() + cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata, -1, retainLastBatch = true) + assertEquals(2, log.logSegments.size) + assertEquals(1, log.logSegments.asScala.head.log.batches.asScala.size, "one batch should be retained in the cleaned segment") + val retainedBatch = log.logSegments.asScala.head.log.batches.asScala.head + assertEquals(log.logSegments.asScala.last.baseOffset - 1, retainedBatch.lastOffset, "the retained batch should be the last batch") + assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, retainedBatch.sizeInBytes, "the retained batch should be an empty batch") Review Comment: To test if a batch is empty, it's probably better to just iterate the batch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org