[kafka] branch trunk updated: KAFKA-12889: log clean relative index range check of group consider empty log segment to avoid too many empty log segment left (#10818)

2021-06-19 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ac5ddc5  KAFKA-12889: log clean relative index range check of group 
consider empty log segment to avoid too many empty log segment left (#10818)
ac5ddc5 is described below

commit ac5ddc574ef23279267a8f9bda737a840be30c85
Author: iamgd67 
AuthorDate: Sun Jun 20 06:33:52 2021 +0800

KAFKA-12889: log clean relative index range check of group consider empty 
log segment to avoid too many empty log segment left (#10818)

To avoid log index 4 byte relative offset overflow, log cleaner group check 
log segments offset to make sure group offset range not exceed Int.MaxValue.

This offset check currentlly not cosider next is next log segment is empty, 
so there will left empty log files every about 2^31 messages.

The left empty logs will be reprocessed every clean cycle, which will 
rewrite it with same empty content, witch cause little no need io.

For __consumer_offsets topic, normally we can set cleanup.policy to 
compact,delete to get rid of this.

My cluster is 0.10.1.1, but after analyze the trunk code, it should has 
same problem too.

Co-authored-by: Liu Qiang(BSS-HZ) 

Reviewers: Luke Chen , Guozhang Wang 
---
 core/src/main/scala/kafka/log/LogCleaner.scala |  5 ++-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 46 ++
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 80916cd..1f1d776 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -837,7 +837,10 @@ private[log] class Cleaner(val id: Int,
 logSize + segs.head.size <= maxSize &&
 indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize &&
 timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
-lastOffsetForFirstSegment(segs, firstUncleanableOffset) - 
group.last.baseOffset <= Int.MaxValue) {
+//if first segment size is 0, we don't need to do the index offset 
range check.
+//this will avoid empty log left every 2^31 message.
+(segs.head.size == 0 ||
+  lastOffsetForFirstSegment(segs, firstUncleanableOffset) - 
group.last.baseOffset <= Int.MaxValue)) {
 group = segs.head :: group
 logSize += segs.head.size
 indexSize += segs.head.offsetIndex.sizeInBytes
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 5c91041..9352f10 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1174,6 +1174,52 @@ class LogCleanerTest {
   "All but the last group should be the target size.")
   }
 
+  @Test
+  def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={
+val cleaner = makeCleaner(Int.MaxValue)
+val logProps = new Properties()
+val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+val k="key".getBytes()
+val v="val".getBytes()
+
+//create 3 segments
+for(i <- 0 until 3){
+  log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), 
leaderEpoch = 0)
+  //0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last 
message of i-th segment
+  val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
+  log.appendAsFollower(records)
+  assertEquals(i + 1, log.numberOfSegments)
+}
+
+//4th active segment, not clean
+log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), 
leaderEpoch = 0)
+
+val totalSegments = 4
+//last segment not cleanable
+val firstUncleanableOffset = log.logEndOffset - 1
+val notCleanableSegments = 1
+
+assertEquals(totalSegments, log.numberOfSegments)
+var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
+//because index file uses 4 byte relative index offset and current 
segments all none empty,
+//segments will not group even their size is very small.
+assertEquals(totalSegments - notCleanableSegments, groups.size)
+//do clean to clean first 2 segments to empty
+cleaner.clean(LogToClean(log.topicPartition, log, 0, 
firstUncleanableOffset))
+assertEquals(totalSegments, log.numberOfSegments)
+assertEquals(0, log.logSegments.head.size)
+
+//after clean we got 2 empty segment, they will group together this time
+groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
+val 

[kafka] branch trunk updated: MINOR: Addressed minor typos in READMEs. (#10905)

2021-06-19 Thread chia7712
This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new d3709da  MINOR: Addressed minor typos in READMEs. (#10905)
d3709da is described below

commit d3709dafbe4d77e0635d3a39bbb9a4f14be159bf
Author: Satish Duggana 
AuthorDate: Sat Jun 19 21:28:57 2021 +0530

MINOR: Addressed minor typos in READMEs. (#10905)

Reviewers: Chia-Ping Tsai 
---
 clients/src/main/resources/common/message/README.md | 6 +++---
 core/src/test/java/kafka/test/junit/README.md   | 4 ++--
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/resources/common/message/README.md 
b/clients/src/main/resources/common/message/README.md
index 29a8491..6185b87 100644
--- a/clients/src/main/resources/common/message/README.md
+++ b/clients/src/main/resources/common/message/README.md
@@ -135,7 +135,7 @@ version.
 You can remove support for a tagged field from a specific version of a message,
 but you can't reuse a tag once it has been used for something else.  Once tags
 have been used for something, they can't be used for anything else, without
-breaking compatibilty.
+breaking compatibility.
 
 Note that tagged fields can only be added to "flexible" message versions.
 
@@ -145,7 +145,7 @@ Kafka serialization has been improved over time to be more 
flexible and
 efficient.  Message versions that contain these improvements are referred to as
 "flexible versions."
 
-In flexible verisons, variable-length fields such as strings, arrays, and bytes
+In flexible versions, variable-length fields such as strings, arrays, and bytes
 fields are serialized in a more efficient way that saves space.  The new
 serialization types start with compact.  For example COMPACT_STRING is a more
 efficient form of STRING.
@@ -190,7 +190,7 @@ been set:
 
 * Array fields default to empty.
 
-You can specify "null" as a default value for a string field by specifing the
+You can specify "null" as a default value for a string field by specifying the
 literal string "null".  Note that you can only specify null as a default if all
 versions of the field are nullable.
 
diff --git a/core/src/test/java/kafka/test/junit/README.md 
b/core/src/test/java/kafka/test/junit/README.md
index dbd2bf4..6df7a26 100644
--- a/core/src/test/java/kafka/test/junit/README.md
+++ b/core/src/test/java/kafka/test/junit/README.md
@@ -135,5 +135,5 @@ class SomeTestClass(helper: IntegrationTestHelper) {
 
 # Gotchas
 * Test methods annotated with JUnit's `@Test` will still be run, but no 
cluster will be started and no dependency 
-  injection will happen. This is generally not what you want
-* Even though ClusterConfig is accessible and mutable inside the test method, 
changing it will have no affect on the cluster 
\ No newline at end of file
+  injection will happen. This is generally not what you want.
+* Even though ClusterConfig is accessible and mutable inside the test method, 
changing it will have no effect on the cluster.
\ No newline at end of file


[kafka] branch trunk updated: MINOR: fix round_trip_fault_test.py - don't assign replicas to nonexistent brokers (#10908)

2021-06-19 Thread chia7712
This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 01c2345  MINOR: fix round_trip_fault_test.py - don't assign replicas 
to nonexistent brokers (#10908)
01c2345 is described below

commit 01c234565825f318b283370b55a8366816f6838f
Author: Chia-Ping Tsai 
AuthorDate: Sat Jun 19 23:54:02 2021 +0800

MINOR: fix round_trip_fault_test.py - don't assign replicas to nonexistent 
brokers (#10908)

The broker id starts with 1 
(https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/kafka/kafka.py#L207)
 so round_trip_fault_test.py fails because it assigns replica to nonexistent 
broker.

The interesting story is the failure happens only on KRaft only. KRaft mode 
checks the existent ids 
(https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L950).
 By contrast, ZK mode has no such check and the min.insync.replicas is set to 1 
so this test works with ZK mode even though there is one replica is always 
off-line.

Reviewers: Ismael Juma 
---
 tests/kafkatest/tests/core/round_trip_fault_test.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py 
b/tests/kafkatest/tests/core/round_trip_fault_test.py
index b674e96..72f4c66 100644
--- a/tests/kafkatest/tests/core/round_trip_fault_test.py
+++ b/tests/kafkatest/tests/core/round_trip_fault_test.py
@@ -46,7 +46,8 @@ class RoundTripFaultTest(Test):
   client_services=trogdor_client_services)
 topic_name = "round_trip_topic%d" % RoundTripFaultTest.topic_name_index
 RoundTripFaultTest.topic_name_index = 
RoundTripFaultTest.topic_name_index + 1
-active_topics={topic_name : {"partitionAssignments":{"0": [0,1,2]}}}
+# note that the broker.id values will be 1..num_nodes
+active_topics={topic_name : {"partitionAssignments":{"0": [1,2,3]}}}
 self.round_trip_spec = RoundTripWorkloadSpec(0, 
TaskSpec.MAX_DURATION_MS,
  self.workload_service.client_node,
  self.workload_service.bootstrap_servers,