[ https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503847#comment-17503847 ]
xiongqi wu commented on KAFKA-13723: ------------------------------------ [~junrao] Hi Jun, this function is supposed to capture violation that pass-by the max compaction delay. e.g, if maxCompactionDelay > 0, which mean it has violated the policy (e.g, the log is not compacted within the maxCompaction config time), and the log should be compact immediately. if maxCompactionDelay = 0, not violation found, and the log doesn't need to be compacted immediately. The name is a little bit misleading. maxCompactionDelay doesn't mean log cleaner should delay util compaction. Instead, it means the delay already happened, and it should be cleaned immediately. > max.compaction.lag.ms implemented incorrectly > --------------------------------------------- > > Key: KAFKA-13723 > URL: https://issues.apache.org/jira/browse/KAFKA-13723 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 2.3.0 > Reporter: Jun Rao > Priority: Major > > In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced > max.compaction.lag.ms to guarantee that a record be cleaned before a certain > time. > > The implementation in LogCleanerManager has the following code. The path for > earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, > it seems that we should set the delay to 0 so that we could trigger cleaning > immediately since the segment has been dirty for longer than > max.compaction.lag.ms. > > > {code:java} > def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : > Long = { > ... > val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L) > val cleanUntilTime = now - maxCompactionLagMs > if (earliestDirtySegmentTimestamp < cleanUntilTime) > cleanUntilTime - earliestDirtySegmentTimestamp > else > 0L > }{code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)