This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new 4e5b864 KAFKA-9133; Cleaner should handle log start offset larger than active segment base offset (#7662) 4e5b864 is described below commit 4e5b86419982050217d06b3c30ba6236e1fd9090 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Fri Nov 8 19:35:22 2019 -0800 KAFKA-9133; Cleaner should handle log start offset larger than active segment base offset (#7662) This was a regression in 2.3.1. In the case of a DeleteRecords call, the log start offset may be higher than the active segment base offset. The cleaner should allow for this case gracefully. Reviewers: Jun Rao <jun...@gmail.com> Co-Authored-By: Tim Van Laer <timvl...@users.noreply.github.com> --- core/src/main/scala/kafka/log/Log.scala | 10 +-- core/src/main/scala/kafka/log/LogCleaner.scala | 13 +-- .../main/scala/kafka/log/LogCleanerManager.scala | 43 +++++++--- .../unit/kafka/log/LogCleanerIntegrationTest.scala | 4 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 99 ++++++++++++++++------ core/src/test/scala/unit/kafka/log/LogTest.scala | 1 + 6 files changed, 111 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 491a4c4..3291e1d 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1918,8 +1918,8 @@ class Log(@volatile var dir: File, lock synchronized { val view = Option(segments.floorKey(from)).map { floor => if (to < floor) - throw new IllegalArgumentException(s"Invalid log segment range: requested segments from offset $from " + - s"mapping to segment with base offset $floor, which is greater than limit offset $to") + throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " + + s"from offset $from mapping to segment with base offset $floor, which is greater than limit offset $to") segments.subMap(floor, to) }.getOrElse(segments.headMap(to)) view.values.asScala @@ -1929,9 +1929,9 @@ class Log(@volatile var dir: File, def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = { lock synchronized { if (from > activeSegment.baseOffset) - throw new IllegalArgumentException("Illegal request for non-active segments beginning at " + - s"offset $from, which is larger than the active segment's base offset ${activeSegment.baseOffset}") - logSegments(from, activeSegment.baseOffset) + Seq.empty + else + logSegments(from, activeSegment.baseOffset) } } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 7e51ff4..47abae5 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -435,17 +435,6 @@ object LogCleaner { fileSuffix = Log.CleanedFileSuffix, initFileSize = log.initFileSize, preallocate = log.config.preallocate) } - /** - * Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log - * @return the biggest uncleanable offset and the total amount of cleanable bytes - */ - def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = { - val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment) - val firstUncleanableOffset = firstUncleanableSegment.baseOffset - val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum - - (firstUncleanableOffset, cleanableBytes) - } } /** @@ -1048,7 +1037,7 @@ private case class LogToClean(topicPartition: TopicPartition, uncleanableOffset: Long, needCompactionNow: Boolean = false) extends Ordered[LogToClean] { val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum - val (firstUncleanableOffset, cleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset) + val (firstUncleanableOffset, cleanableBytes) = LogCleanerManager.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset) val totalBytes = cleanBytes + cleanableBytes val cleanableRatio = cleanableBytes / totalBytes.toDouble override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index efa4d94..18bb871 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Time import org.apache.kafka.common.errors.KafkaStorageException -import scala.collection.{Iterable, immutable, mutable} +import scala.collection.{Iterable, Seq, mutable} private[log] sealed trait LogCleaningState private[log] case object LogCleaningInProgress extends LogCleaningState @@ -105,8 +105,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val now = Time.SYSTEM.milliseconds partitions.map { tp => val log = logs.get(tp) - val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, tp, lastClean, now) - val (_, uncleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, firstUncleanableDirtyOffset) + val lastCleanOffset = lastClean.get(tp) + val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, lastCleanOffset, now) + val (_, uncleanableBytes) = calculateCleanableBytes(log, firstDirtyOffset, firstUncleanableDirtyOffset) uncleanableBytes }.sum } @@ -180,7 +181,8 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition) }.map { case (topicPartition, log) => // create a LogToClean instance for each - val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition, lastClean, now) + val lastCleanOffset = lastClean.get(topicPartition) + val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, lastCleanOffset, now) val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now) preCleanStats.updateMaxCompactionDelay(compactionDelayMs) @@ -510,15 +512,11 @@ private[log] object LogCleanerManager extends Logging { * Returns the range of dirty offsets that can be cleaned. * * @param log the log - * @param lastClean the map of checkpointed offsets + * @param lastCleanOffset the last checkpointed offset * @param now the current time in milliseconds of the cleaning operation * @return the lower (inclusive) and upper (exclusive) offsets */ - def cleanableOffsets(log: Log, topicPartition: TopicPartition, lastClean: immutable.Map[TopicPartition, Long], now: Long): (Long, Long) = { - - // the checkpointed offset, ie., the first offset of the next dirty segment - val lastCleanOffset: Option[Long] = lastClean.get(topicPartition) - + def cleanableOffsets(log: Log, lastCleanOffset: Option[Long], now: Long): (Long, Long) = { // If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid; // reset to the log starting offset and log the error val firstDirtyOffset = { @@ -534,7 +532,7 @@ private[log] object LogCleanerManager extends Logging { } else if (checkpointDirtyOffset > log.logEndOffset) { // The dirty offset has gotten ahead of the log end offset. This could happen if there was data // corruption at the end of the log. We conservatively assume that the full log needs cleaning. - warn(s"The last checkpoint dirty offset for partition $topicPartition is $checkpointDirtyOffset, " + + warn(s"The last checkpoint dirty offset for partition ${log.name} is $checkpointDirtyOffset, " + s"which is larger than the log end offset ${log.logEndOffset}. Resetting to the log start offset $logStartOffset.") logStartOffset } else { @@ -561,14 +559,31 @@ private[log] object LogCleanerManager extends Logging { val dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset) dirtyNonActiveSegments.find { s => val isUncleanable = s.largestTimestamp > now - minCompactionLagMs - debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; is uncleanable=$isUncleanable") + debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} " + + s"segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; " + + s"is uncleanable=$isUncleanable") isUncleanable }.map(_.baseOffset) } else None ).flatten.min - debug(s"Finding range of cleanable offsets for log=${log.name} topicPartition=$topicPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}") + debug(s"Finding range of cleanable offsets for log=${log.name}. Last clean offset=$lastCleanOffset " + + s"now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset " + + s"activeSegment.baseOffset=${log.activeSegment.baseOffset}") + + (firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableDirtyOffset)) + } + + /** + * Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log + * @return the biggest uncleanable offset and the total amount of cleanable bytes + */ + def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = { + val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment) + val firstUncleanableOffset = firstUncleanableSegment.baseOffset + val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum - (firstDirtyOffset, firstUncleanableDirtyOffset) + (firstUncleanableOffset, cleanableBytes) } + } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 2d342fa..b189392 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -84,8 +84,8 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest { val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes", uncleanableDirectory) TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() == 2, "There should be 2 uncleanable partitions", 2000L) - val expectedTotalUncleanableBytes = LogCleaner.calculateCleanableBytes(log, 0, log.logSegments.last.baseOffset)._2 + - LogCleaner.calculateCleanableBytes(log2, 0, log2.logSegments.last.baseOffset)._2 + val expectedTotalUncleanableBytes = LogCleanerManager.calculateCleanableBytes(log, 0, log.logSegments.last.baseOffset)._2 + + LogCleanerManager.calculateCleanableBytes(log2, 0, log2.logSegments.last.baseOffset)._2 TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() == expectedTotalUncleanableBytes, s"There should be $expectedTotalUncleanableBytes uncleanable bytes", 1000L) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 83098bf..309dee8 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -177,6 +177,54 @@ class LogCleanerManagerTest extends Logging { assertEquals(10L, filthiestLog.firstDirtyOffset) } + @Test + def testLogStartOffsetLargerThanActiveSegmentBaseOffset(): Unit = { + val tp = new TopicPartition("foo", 0) + val log = createLog(segmentSize = 2048, LogConfig.Compact, tp) + + val logs = new Pool[TopicPartition, Log]() + logs.put(tp, log) + + appendRecords(log, numRecords = 3) + appendRecords(log, numRecords = 3) + appendRecords(log, numRecords = 3) + + assertEquals(1, log.logSegments.size) + + log.maybeIncrementLogStartOffset(2L) + + val cleanerManager = createCleanerManagerMock(logs) + cleanerCheckpoints.put(tp, 0L) + + val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get + assertEquals(2L, filthiestLog.firstDirtyOffset) + } + + @Test + def testDirtyOffsetLargerThanActiveSegmentBaseOffset(): Unit = { + // It is possible in the case of an unclean leader election for the checkpoint + // dirty offset to get ahead of the active segment base offset, but still be + // within the range of the log. + + val tp = new TopicPartition("foo", 0) + + val logs = new Pool[TopicPartition, Log]() + val log = createLog(2048, LogConfig.Compact, topicPartition = tp) + logs.put(tp, log) + + appendRecords(log, numRecords = 3) + appendRecords(log, numRecords = 3) + + assertEquals(1, log.logSegments.size) + assertEquals(0L, log.activeSegment.baseOffset) + + val cleanerManager = createCleanerManagerMock(logs) + cleanerCheckpoints.put(tp, 3L) + + val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get + assertEquals(3L, filthiestLog.firstDirtyOffset) + } + /** * When checking for logs with segments ready for deletion * we shouldn't consider logs where cleanup.policy=delete @@ -337,8 +385,8 @@ class LogCleanerManagerTest extends Logging { while(log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0) - val lastClean = Map(topicPartition -> 0L) - val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds) + val lastCleanOffset = Some(0L) + val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds) assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1) assertEquals("The first uncleanable offset begins with the active segment.", log.activeSegment.baseOffset, cleanableOffsets._2) } @@ -367,8 +415,8 @@ class LogCleanerManagerTest extends Logging { while (log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1), leaderEpoch = 0) - val lastClean = Map(topicPartition -> 0L) - val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds) + val lastCleanOffset = Some(0L) + val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds) assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1) assertEquals("The first uncleanable offset begins with the second block of log entries.", activeSegAtT0.baseOffset, cleanableOffsets._2) } @@ -392,8 +440,8 @@ class LogCleanerManagerTest extends Logging { time.sleep(compactionLag + 1) - val lastClean = Map(topicPartition -> 0L) - val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds) + val lastCleanOffset = Some(0L) + val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds) assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1) assertEquals("The first uncleanable offset begins with active segment.", log.activeSegment.baseOffset, cleanableOffsets._2) } @@ -420,8 +468,7 @@ class LogCleanerManagerTest extends Logging { time.sleep(compactionLag + 1) // although the compaction lag has been exceeded, the undecided data should not be cleaned - var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, - Map(topicPartition -> 0L), time.milliseconds()) + var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds()) assertEquals(0L, cleanableOffsets._1) assertEquals(0L, cleanableOffsets._2) @@ -431,16 +478,14 @@ class LogCleanerManagerTest extends Logging { log.onHighWatermarkIncremented(4L) // the first segment should now become cleanable immediately - cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, - Map(topicPartition -> 0L), time.milliseconds()) + cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds()) assertEquals(0L, cleanableOffsets._1) assertEquals(3L, cleanableOffsets._2) time.sleep(compactionLag + 1) // the second segment becomes cleanable after the compaction lag - cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, - Map(topicPartition -> 0L), time.milliseconds()) + cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds()) assertEquals(0L, cleanableOffsets._1) assertEquals(4L, cleanableOffsets._2) } @@ -531,26 +576,28 @@ class LogCleanerManagerTest extends Logging { recordsPerBatch: Int, batchesPerSegment: Int): Unit = { for (i <- 0 until numBatches) { - val startOffset = i * recordsPerBatch - val endOffset = startOffset + recordsPerBatch - var lastTimestamp = 0L - val records = (startOffset until endOffset).map { offset => - val currentTimestamp = time.milliseconds() - if (offset == endOffset - 1) - lastTimestamp = currentTimestamp - - new SimpleRecord(currentTimestamp, s"key-$offset".getBytes, s"value-$offset".getBytes) - } - - log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, records:_*), leaderEpoch = 1) - log.onHighWatermarkIncremented(log.logEndOffsetMetadata.messageOffset) - + appendRecords(log, recordsPerBatch) if (i % batchesPerSegment == 0) log.roll() } log.roll() } + private def appendRecords(log: Log, numRecords: Int): Unit = { + val startOffset = log.logEndOffset + val endOffset = startOffset + numRecords + var lastTimestamp = 0L + val records = (startOffset until endOffset).map { offset => + val currentTimestamp = time.milliseconds() + if (offset == endOffset - 1) + lastTimestamp = currentTimestamp + new SimpleRecord(currentTimestamp, s"key-$offset".getBytes, s"value-$offset".getBytes) + } + + log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, records:_*), leaderEpoch = 1) + log.onHighWatermarkIncremented(log.logEndOffsetMetadata.messageOffset) + } + private def makeLog(dir: File = logDir, config: LogConfig) = Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index a864da0..f62205f 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -355,6 +355,7 @@ class LogTest { assertEquals(0 until 5, nonActiveBaseOffsetsFrom(0L)) assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(5L)) assertEquals(2 until 5, nonActiveBaseOffsetsFrom(2L)) + assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(6L)) } @Test