[
https://issues.apache.org/jira/browse/KAFKA-13194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Bradstreet updated KAFKA-13194:
-------------------------------------
Description:
Here we have the cleaning point being bounded to the active segment base offset
and the first unstable offset. Which makes sense:
{code:java}
// find first segment that cannot be cleaned
// neither the active segment, nor segments with any messages closer to the
head of the log than the minimum compaction lag time
// may be cleaned
val firstUncleanableDirtyOffset: Long = Seq( // we do not clean beyond
the first unstable offset
log.firstUnstableOffset, // the active segment is always uncleanable
Option(log.activeSegment.baseOffset), // the first segment whose
largest message timestamp is within a minimum time lag from now
if (minCompactionLagMs > 0) {
// dirty log segments
val dirtyNonActiveSegments =
log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
dirtyNonActiveSegments.find { s =>
val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
debug(s"Checking if log segment may be cleaned: log='${log.name}'
segment.baseOffset=${s.baseOffset} " +
s"segment.largestTimestamp=${s.largestTimestamp}; now -
compactionLag=${now - minCompactionLagMs}; " +
s"is uncleanable=$isUncleanable")
isUncleanable
}.map(_.baseOffset)
} else None
).flatten.min
{code}
But LSO starts out as None.
{code:java}
@volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] =
None
private[log] def firstUnstableOffset: Option[Long] =
firstUnstableOffsetMetadata.map(_.messageOffset){code}
For most code depending on the LSO, fetchLastStableOffsetMetadata is used to
default it to the hwm if it's not set.
{code:java}
private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
checkIfMemoryMappedBufferClosed() // cache the current high watermark to
avoid a concurrent update invalidating the range check
val highWatermarkMetadata = fetchHighWatermarkMetadata
firstUnstableOffsetMetadata match {
case Some(offsetMetadata) if offsetMetadata.messageOffset <
highWatermarkMetadata.messageOffset =>
if (offsetMetadata.messageOffsetOnly) {
lock synchronized {
val fullOffset =
convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
if (firstUnstableOffsetMetadata.contains(offsetMetadata))
firstUnstableOffsetMetadata = Some(fullOffset)
fullOffset
}
} else {
offsetMetadata
}
case _ => highWatermarkMetadata
}
}
{code}
This means that in the case where the hwm is prior to the active segment base,
the log cleaner may clean past the hwm. This is most likely to occur after a
broker restart when the log cleaner may start cleaning prior to replication
becoming active.
was:
Here we have the cleaning point being bounded to the active segment base offset
and the first unstable offset. Which makes sense:
// find first segment that cannot be cleaned
// neither the active segment, nor segments with any messages closer to the
head of the log than the minimum compaction lag time
// may be cleaned
val firstUncleanableDirtyOffset: Long = Seq(
// we do not clean beyond the first unstable offset
log.firstUnstableOffset,
// the active segment is always uncleanable
Option(log.activeSegment.baseOffset),
// the first segment whose largest message timestamp is within a minimum
time lag from now
if (minCompactionLagMs > 0) \{
// dirty log segments
val dirtyNonActiveSegments =
log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
dirtyNonActiveSegments.find { s =>
val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
debug(s"Checking if log segment may be cleaned: log='${log.name}'
segment.baseOffset=${s.baseOffset} " +
s"segment.largestTimestamp=${s.largestTimestamp}; now -
compactionLag=${now - minCompactionLagMs}; " +
s"is uncleanable=$isUncleanable")
isUncleanable
}.map(_.baseOffset)
} else None
).flatten.min
But LSO starts out as None.
@volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata]
= None
private[log] def firstUnstableOffset: Option[Long] =
firstUnstableOffsetMetadata.map(_.messageOffset)
For most code depending on the LSO, fetchLastStableOffsetMetadata is used to
default it to the hwm if it's not set.
private def fetchLastStableOffsetMetadata: LogOffsetMetadata = \{
checkIfMemoryMappedBufferClosed()
// cache the current high watermark to avoid a concurrent update
invalidating the range check
val highWatermarkMetadata = fetchHighWatermarkMetadata
firstUnstableOffsetMetadata match {
case Some(offsetMetadata) if offsetMetadata.messageOffset <
highWatermarkMetadata.messageOffset =>
if (offsetMetadata.messageOffsetOnly) {
lock synchronized {
val fullOffset =
convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
if (firstUnstableOffsetMetadata.contains(offsetMetadata))
firstUnstableOffsetMetadata = Some(fullOffset)
fullOffset
}
} else \{
offsetMetadata
}
case _ => highWatermarkMetadata
}
}
This means that in the case where the hwm is prior to the active segment base,
the log cleaner may clean past the hwm. This is most likely to occur after a
broker restart when the log cleaner may start cleaning prior to replication
becoming active.
> LogCleaner may clean past highwatermark
> ---------------------------------------
>
> Key: KAFKA-13194
> URL: https://issues.apache.org/jira/browse/KAFKA-13194
> Project: Kafka
> Issue Type: Bug
> Reporter: Lucas Bradstreet
> Priority: Major
>
> Here we have the cleaning point being bounded to the active segment base
> offset and the first unstable offset. Which makes sense:
>
> {code:java}
> // find first segment that cannot be cleaned
> // neither the active segment, nor segments with any messages closer to
> the head of the log than the minimum compaction lag time
> // may be cleaned
> val firstUncleanableDirtyOffset: Long = Seq( // we do not clean
> beyond the first unstable offset
> log.firstUnstableOffset, // the active segment is always
> uncleanable
> Option(log.activeSegment.baseOffset), // the first segment whose
> largest message timestamp is within a minimum time lag from now
> if (minCompactionLagMs > 0) {
> // dirty log segments
> val dirtyNonActiveSegments =
> log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
> dirtyNonActiveSegments.find { s =>
> val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
> debug(s"Checking if log segment may be cleaned: log='${log.name}'
> segment.baseOffset=${s.baseOffset} " +
> s"segment.largestTimestamp=${s.largestTimestamp}; now -
> compactionLag=${now - minCompactionLagMs}; " +
> s"is uncleanable=$isUncleanable")
> isUncleanable
> }.map(_.baseOffset)
> } else None
> ).flatten.min
> {code}
>
> But LSO starts out as None.
> {code:java}
> @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata]
> = None
> private[log] def firstUnstableOffset: Option[Long] =
> firstUnstableOffsetMetadata.map(_.messageOffset){code}
> For most code depending on the LSO, fetchLastStableOffsetMetadata is used to
> default it to the hwm if it's not set.
>
> {code:java}
> private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
> checkIfMemoryMappedBufferClosed() // cache the current high watermark
> to avoid a concurrent update invalidating the range check
> val highWatermarkMetadata = fetchHighWatermarkMetadata
> firstUnstableOffsetMetadata match {
> case Some(offsetMetadata) if offsetMetadata.messageOffset <
> highWatermarkMetadata.messageOffset =>
> if (offsetMetadata.messageOffsetOnly) {
> lock synchronized {
> val fullOffset =
> convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
> if (firstUnstableOffsetMetadata.contains(offsetMetadata))
> firstUnstableOffsetMetadata = Some(fullOffset)
> fullOffset
> }
> } else {
> offsetMetadata
> }
> case _ => highWatermarkMetadata
> }
> }
> {code}
>
>
> This means that in the case where the hwm is prior to the active segment
> base, the log cleaner may clean past the hwm. This is most likely to occur
> after a broker restart when the log cleaner may start cleaning prior to
> replication becoming active.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)