This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ac5ddc5 KAFKA-12889: log clean relative index range check of group consider empty log segment to avoid too many empty log segment left (#10818) ac5ddc5 is described below commit ac5ddc574ef23279267a8f9bda737a840be30c85 Author: iamgd67 <iamg...@sina.com> AuthorDate: Sun Jun 20 06:33:52 2021 +0800 KAFKA-12889: log clean relative index range check of group consider empty log segment to avoid too many empty log segment left (#10818) To avoid log index 4 byte relative offset overflow, log cleaner group check log segments offset to make sure group offset range not exceed Int.MaxValue. This offset check currentlly not cosider next is next log segment is empty, so there will left empty log files every about 2^31 messages. The left empty logs will be reprocessed every clean cycle, which will rewrite it with same empty content, witch cause little no need io. For __consumer_offsets topic, normally we can set cleanup.policy to compact,delete to get rid of this. My cluster is 0.10.1.1, but after analyze the trunk code, it should has same problem too. Co-authored-by: Liu Qiang(BSS-HZ) <qliu...@best-inc.com> Reviewers: Luke Chen <show...@gmail.com>, Guozhang Wang <wangg...@gmail.com> --- core/src/main/scala/kafka/log/LogCleaner.scala | 5 ++- .../test/scala/unit/kafka/log/LogCleanerTest.scala | 46 ++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 80916cd..1f1d776 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -837,7 +837,10 @@ private[log] class Cleaner(val id: Int, logSize + segs.head.size <= maxSize && indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize && timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize && - lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) { + //if first segment size is 0, we don't need to do the index offset range check. + //this will avoid empty log left every 2^31 message. + (segs.head.size == 0 || + lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) { group = segs.head :: group logSize += segs.head.size indexSize += segs.head.offsetIndex.sizeInBytes diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 5c91041..9352f10 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1174,6 +1174,52 @@ class LogCleanerTest { "All but the last group should be the target size.") } + @Test + def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={ + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + val k="key".getBytes() + val v="val".getBytes() + + //create 3 segments + for(i <- 0 until 3){ + log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0) + //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment + val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 ) + log.appendAsFollower(records) + assertEquals(i + 1, log.numberOfSegments) + } + + //4th active segment, not clean + log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0) + + val totalSegments = 4 + //last segment not cleanable + val firstUncleanableOffset = log.logEndOffset - 1 + val notCleanableSegments = 1 + + assertEquals(totalSegments, log.numberOfSegments) + var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset) + //because index file uses 4 byte relative index offset and current segments all none empty, + //segments will not group even their size is very small. + assertEquals(totalSegments - notCleanableSegments, groups.size) + //do clean to clean first 2 segments to empty + cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset)) + assertEquals(totalSegments, log.numberOfSegments) + assertEquals(0, log.logSegments.head.size) + + //after clean we got 2 empty segment, they will group together this time + groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset) + val noneEmptySegment = 1 + assertEquals(noneEmptySegment + 1, groups.size) + + //trigger a clean and 2 empty segments should cleaned to 1 + cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset)) + assertEquals(totalSegments - 1, log.numberOfSegments) + } + /** * Validate the logic for grouping log segments together for cleaning when only a small number of * messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not