Lucas Bradstreet created KAFKA-13194:
----------------------------------------

             Summary: LogCleaner may clean past highwatermark
                 Key: KAFKA-13194
                 URL: https://issues.apache.org/jira/browse/KAFKA-13194
             Project: Kafka
          Issue Type: Bug
            Reporter: Lucas Bradstreet


Here we have the cleaning point being bounded to the active segment base offset 
and the first unstable offset. Which makes sense:
    // find first segment that cannot be cleaned
    // neither the active segment, nor segments with any messages closer to the 
head of the log than the minimum compaction lag time
    // may be cleaned
    val firstUncleanableDirtyOffset: Long = Seq(

      // we do not clean beyond the first unstable offset
      log.firstUnstableOffset,

      // the active segment is always uncleanable
      Option(log.activeSegment.baseOffset),

      // the first segment whose largest message timestamp is within a minimum 
time lag from now
      if (minCompactionLagMs > 0) \{
        // dirty log segments
        val dirtyNonActiveSegments = 
log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
        dirtyNonActiveSegments.find { s =>
          val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
          debug(s"Checking if log segment may be cleaned: log='${log.name}' 
segment.baseOffset=${s.baseOffset} " +
            s"segment.largestTimestamp=${s.largestTimestamp}; now - 
compactionLag=${now - minCompactionLagMs}; " +
            s"is uncleanable=$isUncleanable")
          isUncleanable
        }.map(_.baseOffset)
      } else None
    ).flatten.min
But LSO starts out as None.
  @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
= None
  private[log] def firstUnstableOffset: Option[Long] = 
firstUnstableOffsetMetadata.map(_.messageOffset)
For most code depending on the LSO, fetchLastStableOffsetMetadata is used to 
default it to the hwm if it's not set.
  private def fetchLastStableOffsetMetadata: LogOffsetMetadata = \{
    checkIfMemoryMappedBufferClosed()

    // cache the current high watermark to avoid a concurrent update 
invalidating the range check
    val highWatermarkMetadata = fetchHighWatermarkMetadata

    firstUnstableOffsetMetadata match {
      case Some(offsetMetadata) if offsetMetadata.messageOffset < 
highWatermarkMetadata.messageOffset =>
        if (offsetMetadata.messageOffsetOnly) {
          lock synchronized {
            val fullOffset = 
convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
            if (firstUnstableOffsetMetadata.contains(offsetMetadata))
              firstUnstableOffsetMetadata = Some(fullOffset)
            fullOffset
          }
        } else \{
          offsetMetadata
        }
      case _ => highWatermarkMetadata
    }
  }
This means that in the case where the hwm is prior to the active segment base, 
the log cleaner may clean past the hwm. This is most likely to occur after a 
broker restart when the log cleaner may start cleaning prior to replication 
becoming active.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to