This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 4e5b864  KAFKA-9133; Cleaner should handle log start offset larger 
than active segment base offset (#7662)
4e5b864 is described below

commit 4e5b86419982050217d06b3c30ba6236e1fd9090
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Nov 8 19:35:22 2019 -0800

    KAFKA-9133; Cleaner should handle log start offset larger than active 
segment base offset (#7662)
    
    This was a regression in 2.3.1. In the case of a DeleteRecords call, the 
log start offset may be higher than the active segment base offset. The cleaner 
should allow for this case gracefully.
    
    Reviewers: Jun Rao <jun...@gmail.com>
    
    Co-Authored-By: Tim Van Laer <timvl...@users.noreply.github.com>
---
 core/src/main/scala/kafka/log/Log.scala            | 10 +--
 core/src/main/scala/kafka/log/LogCleaner.scala     | 13 +--
 .../main/scala/kafka/log/LogCleanerManager.scala   | 43 +++++++---
 .../unit/kafka/log/LogCleanerIntegrationTest.scala |  4 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 99 ++++++++++++++++------
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  1 +
 6 files changed, 111 insertions(+), 59 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 491a4c4..3291e1d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1918,8 +1918,8 @@ class Log(@volatile var dir: File,
     lock synchronized {
       val view = Option(segments.floorKey(from)).map { floor =>
         if (to < floor)
-          throw new IllegalArgumentException(s"Invalid log segment range: 
requested segments from offset $from " +
-            s"mapping to segment with base offset $floor, which is greater 
than limit offset $to")
+          throw new IllegalArgumentException(s"Invalid log segment range: 
requested segments in $topicPartition " +
+            s"from offset $from mapping to segment with base offset $floor, 
which is greater than limit offset $to")
         segments.subMap(floor, to)
       }.getOrElse(segments.headMap(to))
       view.values.asScala
@@ -1929,9 +1929,9 @@ class Log(@volatile var dir: File,
   def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = {
     lock synchronized {
       if (from > activeSegment.baseOffset)
-        throw new IllegalArgumentException("Illegal request for non-active 
segments beginning at " +
-          s"offset $from, which is larger than the active segment's base 
offset ${activeSegment.baseOffset}")
-      logSegments(from, activeSegment.baseOffset)
+        Seq.empty
+      else
+        logSegments(from, activeSegment.baseOffset)
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 7e51ff4..47abae5 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -435,17 +435,6 @@ object LogCleaner {
       fileSuffix = Log.CleanedFileSuffix, initFileSize = log.initFileSize, 
preallocate = log.config.preallocate)
   }
 
-  /**
-    * Given the first dirty offset and an uncleanable offset, calculates the 
total cleanable bytes for this log
-    * @return the biggest uncleanable offset and the total amount of cleanable 
bytes
-    */
-  def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, 
uncleanableOffset: Long): (Long, Long) = {
-    val firstUncleanableSegment = 
log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
-    val firstUncleanableOffset = firstUncleanableSegment.baseOffset
-    val cleanableBytes = log.logSegments(firstDirtyOffset, 
math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
-
-    (firstUncleanableOffset, cleanableBytes)
-  }
 }
 
 /**
@@ -1048,7 +1037,7 @@ private case class LogToClean(topicPartition: 
TopicPartition,
                               uncleanableOffset: Long,
                               needCompactionNow: Boolean = false) extends 
Ordered[LogToClean] {
   val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
-  val (firstUncleanableOffset, cleanableBytes) = 
LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
+  val (firstUncleanableOffset, cleanableBytes) = 
LogCleanerManager.calculateCleanableBytes(log, firstDirtyOffset, 
uncleanableOffset)
   val totalBytes = cleanBytes + cleanableBytes
   val cleanableRatio = cleanableBytes / totalBytes.toDouble
   override def compare(that: LogToClean): Int = 
math.signum(this.cleanableRatio - that.cleanableRatio).toInt
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index efa4d94..18bb871 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.errors.KafkaStorageException
 
-import scala.collection.{Iterable, immutable, mutable}
+import scala.collection.{Iterable, Seq, mutable}
 
 private[log] sealed trait LogCleaningState
 private[log] case object LogCleaningInProgress extends LogCleaningState
@@ -105,8 +105,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
                   val now = Time.SYSTEM.milliseconds
                   partitions.map { tp =>
                     val log = logs.get(tp)
-                    val (firstDirtyOffset, firstUncleanableDirtyOffset) = 
cleanableOffsets(log, tp, lastClean, now)
-                    val (_, uncleanableBytes) = 
LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, 
firstUncleanableDirtyOffset)
+                    val lastCleanOffset = lastClean.get(tp)
+                    val (firstDirtyOffset, firstUncleanableDirtyOffset) = 
cleanableOffsets(log, lastCleanOffset, now)
+                    val (_, uncleanableBytes) = calculateCleanableBytes(log, 
firstDirtyOffset, firstUncleanableDirtyOffset)
                     uncleanableBytes
                   }.sum
                 }
@@ -180,7 +181,8 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
           inProgress.contains(topicPartition) || isUncleanablePartition(log, 
topicPartition)
       }.map {
         case (topicPartition, log) => // create a LogToClean instance for each
-          val (firstDirtyOffset, firstUncleanableDirtyOffset) = 
cleanableOffsets(log, topicPartition, lastClean, now)
+          val lastCleanOffset = lastClean.get(topicPartition)
+          val (firstDirtyOffset, firstUncleanableDirtyOffset) = 
cleanableOffsets(log, lastCleanOffset, now)
           val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, 
now)
           preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
 
@@ -510,15 +512,11 @@ private[log] object LogCleanerManager extends Logging {
     * Returns the range of dirty offsets that can be cleaned.
     *
     * @param log the log
-    * @param lastClean the map of checkpointed offsets
+    * @param lastCleanOffset the last checkpointed offset
     * @param now the current time in milliseconds of the cleaning operation
     * @return the lower (inclusive) and upper (exclusive) offsets
     */
-  def cleanableOffsets(log: Log, topicPartition: TopicPartition, lastClean: 
immutable.Map[TopicPartition, Long], now: Long): (Long, Long) = {
-
-    // the checkpointed offset, ie., the first offset of the next dirty segment
-    val lastCleanOffset: Option[Long] = lastClean.get(topicPartition)
-
+  def cleanableOffsets(log: Log, lastCleanOffset: Option[Long], now: Long): 
(Long, Long) = {
     // If the log segments are abnormally truncated and hence the checkpointed 
offset is no longer valid;
     // reset to the log starting offset and log the error
     val firstDirtyOffset = {
@@ -534,7 +532,7 @@ private[log] object LogCleanerManager extends Logging {
       } else if (checkpointDirtyOffset > log.logEndOffset) {
         // The dirty offset has gotten ahead of the log end offset. This could 
happen if there was data
         // corruption at the end of the log. We conservatively assume that the 
full log needs cleaning.
-        warn(s"The last checkpoint dirty offset for partition $topicPartition 
is $checkpointDirtyOffset, " +
+        warn(s"The last checkpoint dirty offset for partition ${log.name} is 
$checkpointDirtyOffset, " +
           s"which is larger than the log end offset ${log.logEndOffset}. 
Resetting to the log start offset $logStartOffset.")
         logStartOffset
       } else {
@@ -561,14 +559,31 @@ private[log] object LogCleanerManager extends Logging {
         val dirtyNonActiveSegments = 
log.nonActiveLogSegmentsFrom(firstDirtyOffset)
         dirtyNonActiveSegments.find { s =>
           val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
-          debug(s"Checking if log segment may be cleaned: log='${log.name}' 
segment.baseOffset=${s.baseOffset} 
segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - 
minCompactionLagMs}; is uncleanable=$isUncleanable")
+          debug(s"Checking if log segment may be cleaned: log='${log.name}' 
segment.baseOffset=${s.baseOffset} " +
+            s"segment.largestTimestamp=${s.largestTimestamp}; now - 
compactionLag=${now - minCompactionLagMs}; " +
+            s"is uncleanable=$isUncleanable")
           isUncleanable
         }.map(_.baseOffset)
       } else None
     ).flatten.min
 
-    debug(s"Finding range of cleanable offsets for log=${log.name} 
topicPartition=$topicPartition. Last clean offset=$lastCleanOffset now=$now => 
firstDirtyOffset=$firstDirtyOffset 
firstUncleanableOffset=$firstUncleanableDirtyOffset 
activeSegment.baseOffset=${log.activeSegment.baseOffset}")
+    debug(s"Finding range of cleanable offsets for log=${log.name}. Last clean 
offset=$lastCleanOffset " +
+      s"now=$now => firstDirtyOffset=$firstDirtyOffset 
firstUncleanableOffset=$firstUncleanableDirtyOffset " +
+      s"activeSegment.baseOffset=${log.activeSegment.baseOffset}")
+
+    (firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableDirtyOffset))
+  }
+
+  /**
+   * Given the first dirty offset and an uncleanable offset, calculates the 
total cleanable bytes for this log
+   * @return the biggest uncleanable offset and the total amount of cleanable 
bytes
+   */
+  def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, 
uncleanableOffset: Long): (Long, Long) = {
+    val firstUncleanableSegment = 
log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
+    val firstUncleanableOffset = firstUncleanableSegment.baseOffset
+    val cleanableBytes = log.logSegments(firstDirtyOffset, 
math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
 
-    (firstDirtyOffset, firstUncleanableDirtyOffset)
+    (firstUncleanableOffset, cleanableBytes)
   }
+
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 2d342fa..b189392 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -84,8 +84,8 @@ class LogCleanerIntegrationTest extends 
AbstractLogCleanerIntegrationTest {
     val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes", 
uncleanableDirectory)
 
     TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() == 
2, "There should be 2 uncleanable partitions", 2000L)
-    val expectedTotalUncleanableBytes = 
LogCleaner.calculateCleanableBytes(log, 0, log.logSegments.last.baseOffset)._2 +
-      LogCleaner.calculateCleanableBytes(log2, 0, 
log2.logSegments.last.baseOffset)._2
+    val expectedTotalUncleanableBytes = 
LogCleanerManager.calculateCleanableBytes(log, 0, 
log.logSegments.last.baseOffset)._2 +
+      LogCleanerManager.calculateCleanableBytes(log2, 0, 
log2.logSegments.last.baseOffset)._2
     TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() == 
expectedTotalUncleanableBytes,
       s"There should be $expectedTotalUncleanableBytes uncleanable bytes", 
1000L)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 83098bf..309dee8 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -177,6 +177,54 @@ class LogCleanerManagerTest extends Logging {
     assertEquals(10L, filthiestLog.firstDirtyOffset)
   }
 
+  @Test
+  def testLogStartOffsetLargerThanActiveSegmentBaseOffset(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val log = createLog(segmentSize = 2048, LogConfig.Compact, tp)
+
+    val logs = new Pool[TopicPartition, Log]()
+    logs.put(tp, log)
+
+    appendRecords(log, numRecords = 3)
+    appendRecords(log, numRecords = 3)
+    appendRecords(log, numRecords = 3)
+
+    assertEquals(1, log.logSegments.size)
+
+    log.maybeIncrementLogStartOffset(2L)
+
+    val cleanerManager = createCleanerManagerMock(logs)
+    cleanerCheckpoints.put(tp, 0L)
+
+    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
+    assertEquals(2L, filthiestLog.firstDirtyOffset)
+  }
+
+  @Test
+  def testDirtyOffsetLargerThanActiveSegmentBaseOffset(): Unit = {
+    // It is possible in the case of an unclean leader election for the 
checkpoint
+    // dirty offset to get ahead of the active segment base offset, but still 
be
+    // within the range of the log.
+
+    val tp = new TopicPartition("foo", 0)
+
+    val logs = new Pool[TopicPartition, Log]()
+    val log = createLog(2048, LogConfig.Compact, topicPartition = tp)
+    logs.put(tp, log)
+
+    appendRecords(log, numRecords = 3)
+    appendRecords(log, numRecords = 3)
+
+    assertEquals(1, log.logSegments.size)
+    assertEquals(0L, log.activeSegment.baseOffset)
+
+    val cleanerManager = createCleanerManagerMock(logs)
+    cleanerCheckpoints.put(tp, 3L)
+
+    val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
+    assertEquals(3L, filthiestLog.firstDirtyOffset)
+  }
+
   /**
     * When checking for logs with segments ready for deletion
     * we shouldn't consider logs where cleanup.policy=delete
@@ -337,8 +385,8 @@ class LogCleanerManagerTest extends Logging {
     while(log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
 
-    val lastClean = Map(topicPartition -> 0L)
-    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
topicPartition, lastClean, time.milliseconds)
+    val lastCleanOffset = Some(0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the 
log.", 0L, cleanableOffsets._1)
     assertEquals("The first uncleanable offset begins with the active 
segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
   }
@@ -367,8 +415,8 @@ class LogCleanerManagerTest extends Logging {
     while (log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, t1), leaderEpoch = 0)
 
-    val lastClean = Map(topicPartition -> 0L)
-    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
topicPartition, lastClean, time.milliseconds)
+    val lastCleanOffset = Some(0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the 
log.", 0L, cleanableOffsets._1)
     assertEquals("The first uncleanable offset begins with the second block of 
log entries.", activeSegAtT0.baseOffset, cleanableOffsets._2)
   }
@@ -392,8 +440,8 @@ class LogCleanerManagerTest extends Logging {
 
     time.sleep(compactionLag + 1)
 
-    val lastClean = Map(topicPartition -> 0L)
-    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
topicPartition, lastClean, time.milliseconds)
+    val lastCleanOffset = Some(0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the 
log.", 0L, cleanableOffsets._1)
     assertEquals("The first uncleanable offset begins with active segment.", 
log.activeSegment.baseOffset, cleanableOffsets._2)
   }
@@ -420,8 +468,7 @@ class LogCleanerManagerTest extends Logging {
 
     time.sleep(compactionLag + 1)
     // although the compaction lag has been exceeded, the undecided data 
should not be cleaned
-    var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
topicPartition,
-      Map(topicPartition -> 0L), time.milliseconds())
+    var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), 
time.milliseconds())
     assertEquals(0L, cleanableOffsets._1)
     assertEquals(0L, cleanableOffsets._2)
 
@@ -431,16 +478,14 @@ class LogCleanerManagerTest extends Logging {
     log.onHighWatermarkIncremented(4L)
 
     // the first segment should now become cleanable immediately
-    cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
-      Map(topicPartition -> 0L), time.milliseconds())
+    cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), 
time.milliseconds())
     assertEquals(0L, cleanableOffsets._1)
     assertEquals(3L, cleanableOffsets._2)
 
     time.sleep(compactionLag + 1)
 
     // the second segment becomes cleanable after the compaction lag
-    cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
-      Map(topicPartition -> 0L), time.milliseconds())
+    cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), 
time.milliseconds())
     assertEquals(0L, cleanableOffsets._1)
     assertEquals(4L, cleanableOffsets._2)
   }
@@ -531,26 +576,28 @@ class LogCleanerManagerTest extends Logging {
                            recordsPerBatch: Int,
                            batchesPerSegment: Int): Unit = {
     for (i <- 0 until numBatches) {
-      val startOffset = i * recordsPerBatch
-      val endOffset = startOffset + recordsPerBatch
-      var lastTimestamp = 0L
-      val records = (startOffset until endOffset).map { offset =>
-        val currentTimestamp = time.milliseconds()
-        if (offset == endOffset - 1)
-          lastTimestamp = currentTimestamp
-
-        new SimpleRecord(currentTimestamp, s"key-$offset".getBytes, 
s"value-$offset".getBytes)
-      }
-
-      log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 
records:_*), leaderEpoch = 1)
-      log.onHighWatermarkIncremented(log.logEndOffsetMetadata.messageOffset)
-
+      appendRecords(log, recordsPerBatch)
       if (i % batchesPerSegment == 0)
         log.roll()
     }
     log.roll()
   }
 
+  private def appendRecords(log: Log, numRecords: Int): Unit = {
+    val startOffset = log.logEndOffset
+    val endOffset = startOffset + numRecords
+    var lastTimestamp = 0L
+    val records = (startOffset until endOffset).map { offset =>
+      val currentTimestamp = time.milliseconds()
+      if (offset == endOffset - 1)
+        lastTimestamp = currentTimestamp
+      new SimpleRecord(currentTimestamp, s"key-$offset".getBytes, 
s"value-$offset".getBytes)
+    }
+
+    log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 
records:_*), leaderEpoch = 1)
+    log.onHighWatermarkIncremented(log.logEndOffsetMetadata.messageOffset)
+  }
+
   private def makeLog(dir: File = logDir, config: LogConfig) =
     Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, 
scheduler = time.scheduler,
       time = time, brokerTopicStats = new BrokerTopicStats, 
maxProducerIdExpirationMs = 60 * 60 * 1000,
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index a864da0..f62205f 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -355,6 +355,7 @@ class LogTest {
     assertEquals(0 until 5, nonActiveBaseOffsetsFrom(0L))
     assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(5L))
     assertEquals(2 until 5, nonActiveBaseOffsetsFrom(2L))
+    assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(6L))
   }
 
   @Test

Reply via email to