[kafka] branch trunk updated: KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920)

2023-08-03 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new e0b7499103d KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict 
(#13920)
e0b7499103d is described below

commit e0b7499103df9222140cdbf7047494d92913987e
Author: flashmouse 
AuthorDate: Fri Aug 4 02:17:08 2023 +0800

KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920)

in 3.5.0 AbstractStickyAssignor may run useless loop in 
performReassignments  because isBalanced have a trivial mistake, and result in 
rebalance timeout in some situation.

Co-authored-by: lixy 
Reviewers: Ritika Reddy , Philip Nee 
, Kirk True , Guozhang Wang 

---
 .../consumer/internals/AbstractStickyAssignor.java |  5 +-
 .../internals/AbstractStickyAssignorTest.java  | 86 ++
 2 files changed, 89 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 1bde792d598..0823752d159 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -158,10 +158,11 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 // generation amongst
 for (final TopicPartition tp : memberData.partitions) {
 if (allTopics.contains(tp.topic())) {
-String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
 if (otherConsumer == null) {
 // this partition is not owned by other consumer in 
the same generation
 ownedPartitions.add(tp);
+allPreviousPartitionsToOwner.put(tp, consumer);
 } else {
 final int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
 
@@ -1172,7 +1173,7 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 if 
(!currentAssignment.get(consumer).contains(topicPartition)) {
 String otherConsumer = 
allPartitions.get(topicPartition);
 int otherConsumerPartitionCount = 
currentAssignment.get(otherConsumer).size();
-if (consumerPartitionCount < 
otherConsumerPartitionCount) {
+if (consumerPartitionCount + 1 < 
otherConsumerPartitionCount) {
 log.debug("{} can be moved from consumer {} to 
consumer {} for a more balanced assignment.",
 topicPartition, otherConsumer, 
consumer);
 return false;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index cdb0142c49b..71d188f3cd7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.clients.consumer.StickyAssignor;
@@ -724,6 +725,91 @@ public abstract class AbstractStickyAssignorTest {
 assignor.assignPartitions(partitionsPerTopic, subscriptions);
 }
 
+@Timeout(90)
+@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
+@ValueSource(booleans = {false, true})
+public void 
testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean 
hasConsumerRack) {
+initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK 
: RackConfig.NO_CONSUMER_RACK);
+int topicCount = hasConsumerRack ? 50 : 100;
+int partitionCount = 2_00;
+int consumerCount = 5_00;
+
+List topics = new ArrayList<>();
+Map> partitionsPerTopic = new HashMap<>();
+for (int i = 0; i < topicCount; i++) {
+String topicName = getTopicName(i, topicCount);
+topics.add(topicName);
+partitionsPerTopic.put(topicName, partitionInfos(topicName, 
partitionCount));
+}
+for (int i = 0; i < consumerCount; i++) {
+if (i % 4 == 0) {
+

[kafka] branch trunk updated: KAFKA-7438: Replace PowerMockRunner with MockitoJUnitRunner in RetryUtilTest (#14143)

2023-08-03 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new b9936d6292f KAFKA-7438: Replace PowerMockRunner with 
MockitoJUnitRunner in RetryUtilTest (#14143)
b9936d6292f is described below

commit b9936d6292f3d7e76260b91e96520f94d5bc9bd7
Author: Yash Mayya 
AuthorDate: Thu Aug 3 18:07:35 2023 +0100

KAFKA-7438: Replace PowerMockRunner with MockitoJUnitRunner in 
RetryUtilTest (#14143)

Reviewers: Chris Egerton 
---
 .../main/java/org/apache/kafka/connect/util/RetryUtil.java   |  2 +-
 .../java/org/apache/kafka/connect/util/RetryUtilTest.java| 12 ++--
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
index 174f9be0ab8..8babe4ebfc8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
@@ -80,7 +80,7 @@ public class RetryUtil {
 
 final long end = time.milliseconds() + timeoutMs;
 int attempt = 0;
-Throwable lastError = null;
+Throwable lastError;
 do {
 attempt++;
 try {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
index 08e2157c7be..b8884b5ada8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
@@ -16,10 +16,6 @@
  */
 package org.apache.kafka.connect.util;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.MockTime;
@@ -29,13 +25,17 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.time.Duration;
 import java.util.concurrent.Callable;
 import java.util.function.Supplier;
 
-@RunWith(PowerMockRunner.class)
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class RetryUtilTest {
 
 private final Time mockTime = new MockTime(10);



[kafka] branch trunk updated: MINOR: Fix debug logs to display TimeIndexOffset (#13935)

2023-08-03 Thread divijv
This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 7d39d7400c9 MINOR: Fix debug logs to display TimeIndexOffset (#13935)
7d39d7400c9 is described below

commit 7d39d7400c919a519fb73d93e311eba9b13bbb97
Author: Divij Vaidya 
AuthorDate: Thu Aug 3 11:05:01 2023 +0200

MINOR: Fix debug logs to display TimeIndexOffset (#13935)

Reviewers: Luke Chen 
---
 .../java/org/apache/kafka/storage/internals/log/TimeIndex.java | 4 ++--
 .../org/apache/kafka/storage/internals/log/TimestampOffset.java| 7 +++
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
index f221cf4b0e0..e1ade8c5314 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
@@ -66,7 +66,7 @@ public class TimeIndex extends AbstractIndex {
 this.lastEntry = lastEntryFromIndexFile();
 
 log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = 
{}, entries = {}, lastOffset = {}, file position = {}",
-file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), 
lastEntry, mmap().position());
+file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), 
lastEntry.offset, mmap().position());
 }
 
 @Override
@@ -278,7 +278,7 @@ public class TimeIndex extends AbstractIndex {
 super.truncateToEntries0(entries);
 this.lastEntry = lastEntryFromIndexFile();
 log.debug("Truncated index {} to {} entries; position is now {} 
and last entry is now {}",
-file().getAbsolutePath(), entries, mmap().position(), 
lastEntry);
+file().getAbsolutePath(), entries, mmap().position(), 
lastEntry.offset);
 } finally {
 lock.unlock();
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimestampOffset.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimestampOffset.java
index 9b48352c213..9dc35ae32d4 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimestampOffset.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimestampOffset.java
@@ -67,4 +67,11 @@ public class TimestampOffset implements IndexEntry {
 result = 31 * result + Long.hashCode(offset);
 return result;
 }
+
+@Override
+public String toString() {
+return String.format("TimestampOffset(offset = %d, timestamp = %d)",
+offset,
+timestamp);
+}
 }



[kafka] branch trunk updated: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs (#14114)

2023-08-03 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new d89b26ff443 KAFKA-12969: Add broker level config synonyms for topic 
level tiered storage configs (#14114)
d89b26ff443 is described below

commit d89b26ff443c8dd5d584ad0a979ac3944366cc06
Author: Kamal Chandraprakash 
AuthorDate: Thu Aug 3 13:56:00 2023 +0530

KAFKA-12969: Add broker level config synonyms for topic level tiered 
storage configs (#14114)

KAFKA-12969: Add broker level config synonyms for topic level tiered 
storage configs.

Topic -> Broker Synonym:
local.retention.bytes -> log.local.retention.bytes
local.retention.ms -> log.local.retention.ms

We cannot add synonym for `remote.storage.enable` topic property as it 
depends on KIP-950

Reviewers: Divij Vaidya , Satish Duggana 
, Luke Chen 
---
 .../server/ReplicaFetcherTierStateMachine.java |   2 +-
 core/src/main/scala/kafka/log/UnifiedLog.scala |   2 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  34 ++
 core/src/main/scala/kafka/server/KafkaConfig.scala |  13 ++-
 .../test/scala/unit/kafka/log/LogConfigTest.scala  |  39 +--
 .../kafka/server/DynamicBrokerConfigTest.scala |  94 +
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  28 -
 .../server/config/ServerTopicConfigSynonyms.java   |   4 +-
 .../log/remote/storage/RemoteLogManagerConfig.java |  28 -
 .../kafka/storage/internals/log/LogConfig.java | 115 -
 10 files changed, 292 insertions(+), 67 deletions(-)

diff --git 
a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java 
b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
index 21c99d11474..3f5a1b2c69f 100644
--- a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
+++ b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
@@ -180,7 +180,7 @@ public class ReplicaFetcherTierStateMachine implements 
TierStateMachine {
 
 long nextOffset;
 
-if (unifiedLog.remoteStorageSystemEnable() && 
unifiedLog.config().remoteLogConfig.remoteStorageEnable) {
+if (unifiedLog.remoteStorageSystemEnable() && 
unifiedLog.config().remoteStorageEnable()) {
 if (replicaMgr.remoteLogManager().isEmpty()) throw new 
IllegalStateException("RemoteLogManager is not yet instantiated");
 
 RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 028f2fbbd5e..1b191844e77 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -168,7 +168,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   !(config.compact || Topic.isInternal(topicPartition.topic())
 || 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topicPartition.topic())
 || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topicPartition.topic())) &&
-  config.remoteLogConfig.remoteStorageEnable
+  config.remoteStorageEnable()
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 7c6d5284d71..57aaf396e85 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.network.{ListenerName, 
ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.utils.{ConfigUtils, Utils}
 import org.apache.kafka.server.config.ServerTopicConfigSynonyms
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.storage.internals.log.{LogConfig, 
ProducerStateManagerConfig}
 
 import scala.annotation.nowarn
@@ -675,6 +676,39 @@ class DynamicLogConfig(logManager: LogManager, server: 
KafkaBroker) extends Brok
 // For update of topic config overrides, only config names and types are 
validated
 // Names and types have already been validated. For consistency with topic 
config
 // validation, no additional validation is performed.
+
+def validateLogLocalRetentionMs(): Unit = {
+  val logRetentionMs = newConfig.logRetentionTimeMillis
+  val logLocalRetentionMs: java.lang.Long = newConfig.logLocalRetentionMs
+  if (logRetentionMs != -1L && logLocalRetentionMs != -2L) {
+if (logLocalRetentionMs == -1L) {
+  throw new 
ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, 
logLocalRetentionMs,
+s"Value must not be -1 as 
${KafkaConfig.LogRetentionTimeMillisProp} value is set as $logRetentionMs.")
+}
+if