[
https://issues.apache.org/jira/browse/KAFKA-13194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Bradstreet updated KAFKA-13194:
-------------------------------------
Priority: Minor (was: Major)
> LogCleaner may clean past highwatermark
> ---------------------------------------
>
> Key: KAFKA-13194
> URL: https://issues.apache.org/jira/browse/KAFKA-13194
> Project: Kafka
> Issue Type: Bug
> Reporter: Lucas Bradstreet
> Priority: Minor
>
> Here we have the cleaning point being bounded to the active segment base
> offset and the first unstable offset. Which makes sense:
>
> {code:java}
> // find first segment that cannot be cleaned
> // neither the active segment, nor segments with any messages closer to
> the head of the log than the minimum compaction lag time
> // may be cleaned
> val firstUncleanableDirtyOffset: Long = Seq( // we do not clean
> beyond the first unstable offset
> log.firstUnstableOffset, // the active segment is always
> uncleanable
> Option(log.activeSegment.baseOffset), // the first segment whose
> largest message timestamp is within a minimum time lag from now
> if (minCompactionLagMs > 0) {
> // dirty log segments
> val dirtyNonActiveSegments =
> log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
> dirtyNonActiveSegments.find { s =>
> val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
> debug(s"Checking if log segment may be cleaned: log='${log.name}'
> segment.baseOffset=${s.baseOffset} " +
> s"segment.largestTimestamp=${s.largestTimestamp}; now -
> compactionLag=${now - minCompactionLagMs}; " +
> s"is uncleanable=$isUncleanable")
> isUncleanable
> }.map(_.baseOffset)
> } else None
> ).flatten.min
> {code}
>
> But LSO starts out as None.
> {code:java}
> @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata]
> = None
> private[log] def firstUnstableOffset: Option[Long] =
> firstUnstableOffsetMetadata.map(_.messageOffset){code}
> For most code depending on the LSO, fetchLastStableOffsetMetadata is used to
> default it to the hwm if it's not set.
>
> {code:java}
> private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
> checkIfMemoryMappedBufferClosed() // cache the current high watermark
> to avoid a concurrent update invalidating the range check
> val highWatermarkMetadata = fetchHighWatermarkMetadata
> firstUnstableOffsetMetadata match {
> case Some(offsetMetadata) if offsetMetadata.messageOffset <
> highWatermarkMetadata.messageOffset =>
> if (offsetMetadata.messageOffsetOnly) {
> lock synchronized {
> val fullOffset =
> convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
> if (firstUnstableOffsetMetadata.contains(offsetMetadata))
> firstUnstableOffsetMetadata = Some(fullOffset)
> fullOffset
> }
> } else {
> offsetMetadata
> }
> case _ => highWatermarkMetadata
> }
> }
> {code}
>
>
> This means that in the case where the hwm is prior to the active segment
> base, the log cleaner may clean past the hwm. This is most likely to occur
> after a broker restart when the log cleaner may start cleaning prior to
> replication becoming active.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)