This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ac5ddc5  KAFKA-12889: log clean relative index range check of group 
consider empty log segment to avoid too many empty log segment left (#10818)
ac5ddc5 is described below

commit ac5ddc574ef23279267a8f9bda737a840be30c85
Author: iamgd67 <iamg...@sina.com>
AuthorDate: Sun Jun 20 06:33:52 2021 +0800

    KAFKA-12889: log clean relative index range check of group consider empty 
log segment to avoid too many empty log segment left (#10818)
    
    To avoid log index 4 byte relative offset overflow, log cleaner group check 
log segments offset to make sure group offset range not exceed Int.MaxValue.
    
    This offset check currentlly not cosider next is next log segment is empty, 
so there will left empty log files every about 2^31 messages.
    
    The left empty logs will be reprocessed every clean cycle, which will 
rewrite it with same empty content, witch cause little no need io.
    
    For __consumer_offsets topic, normally we can set cleanup.policy to 
compact,delete to get rid of this.
    
    My cluster is 0.10.1.1, but after analyze the trunk code, it should has 
same problem too.
    
    Co-authored-by: Liu Qiang(BSS-HZ) <qliu...@best-inc.com>
    
    Reviewers: Luke Chen <show...@gmail.com>, Guozhang Wang <wangg...@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  5 ++-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 46 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 80916cd..1f1d776 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -837,7 +837,10 @@ private[log] class Cleaner(val id: Int,
             logSize + segs.head.size <= maxSize &&
             indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize &&
             timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
-            lastOffsetForFirstSegment(segs, firstUncleanableOffset) - 
group.last.baseOffset <= Int.MaxValue) {
+            //if first segment size is 0, we don't need to do the index offset 
range check.
+            //this will avoid empty log left every 2^31 message.
+            (segs.head.size == 0 ||
+              lastOffsetForFirstSegment(segs, firstUncleanableOffset) - 
group.last.baseOffset <= Int.MaxValue)) {
         group = segs.head :: group
         logSize += segs.head.size
         indexSize += segs.head.offsetIndex.sizeInBytes
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 5c91041..9352f10 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1174,6 +1174,52 @@ class LogCleanerTest {
       "All but the last group should be the target size.")
   }
 
+  @Test
+  def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    val k="key".getBytes()
+    val v="val".getBytes()
+
+    //create 3 segments
+    for(i <- 0 until 3){
+      log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), 
leaderEpoch = 0)
+      //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last 
message of i-th segment
+      val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
+      log.appendAsFollower(records)
+      assertEquals(i + 1, log.numberOfSegments)
+    }
+
+    //4th active segment, not clean
+    log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), 
leaderEpoch = 0)
+
+    val totalSegments = 4
+    //last segment not cleanable
+    val firstUncleanableOffset = log.logEndOffset - 1
+    val notCleanableSegments = 1
+
+    assertEquals(totalSegments, log.numberOfSegments)
+    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
+    //because index file uses 4 byte relative index offset and current 
segments all none empty,
+    //segments will not group even their size is very small.
+    assertEquals(totalSegments - notCleanableSegments, groups.size)
+    //do clean to clean first 2 segments to empty
+    cleaner.clean(LogToClean(log.topicPartition, log, 0, 
firstUncleanableOffset))
+    assertEquals(totalSegments, log.numberOfSegments)
+    assertEquals(0, log.logSegments.head.size)
+
+    //after clean we got 2 empty segment, they will group together this time
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
+    val noneEmptySegment = 1
+    assertEquals(noneEmptySegment + 1, groups.size)
+
+    //trigger a clean and 2 empty segments should cleaned to 1
+    cleaner.clean(LogToClean(log.topicPartition, log, 0, 
firstUncleanableOffset))
+    assertEquals(totalSegments - 1, log.numberOfSegments)
+  }
+
   /**
    * Validate the logic for grouping log segments together for cleaning when 
only a small number of
    * messages are retained, but the range of offsets is greater than 
Int.MaxValue. A group should not

Reply via email to