jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r582407274
########## File path: core/src/main/scala/kafka/log/LogCleaner.scala ########## @@ -599,21 +606,30 @@ private[log] class Cleaner(val id: Int, } currentSegmentOpt = nextSegmentOpt } - - cleaned.onBecomeInactiveSegment() - // flush new segment to disk before swap - cleaned.flush() - - // update the modification date to retain the last modified date of the original files - val modified = segments.last.lastModified - cleaned.lastModified = modified - - // swap in new segment - info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log") - log.replaceSegments(List(cleaned), segments) + + // Result of cleaning included at least one record. + if (cleanedSegment.isDefined) { + val cleaned = cleanedSegment.get + cleaned.onBecomeInactiveSegment() + // flush new segment to disk before swap + cleaned.flush() + + // update the modification date to retain the last modified date of the original files + val modified = segments.last.lastModified + cleaned.lastModified = modified + + // swap in new segment + info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log") + log.replaceSegments(List(cleaned), segments) Review comment: As for the first part about `replaceSegments`. It seems that there are a few other times we call this method. I'm wondering if we would want to update the logStartOffset in these cases (probably) and what the reason should be. I'm also wondering if we should include the reason as a parameter to `replaceSegments`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org