[kafka] branch trunk updated: KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e0b7499103d KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920) e0b7499103d is described below commit e0b7499103df9222140cdbf7047494d92913987e Author: flashmouse AuthorDate: Fri Aug 4 02:17:08 2023 +0800 KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920) in 3.5.0 AbstractStickyAssignor may run useless loop in performReassignments because isBalanced have a trivial mistake, and result in rebalance timeout in some situation. Co-authored-by: lixy Reviewers: Ritika Reddy , Philip Nee , Kirk True , Guozhang Wang --- .../consumer/internals/AbstractStickyAssignor.java | 5 +- .../internals/AbstractStickyAssignorTest.java | 86 ++ 2 files changed, 89 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index 1bde792d598..0823752d159 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -158,10 +158,11 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { // generation amongst for (final TopicPartition tp : memberData.partitions) { if (allTopics.contains(tp.topic())) { -String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer); +String otherConsumer = allPreviousPartitionsToOwner.get(tp); if (otherConsumer == null) { // this partition is not owned by other consumer in the same generation ownedPartitions.add(tp); +allPreviousPartitionsToOwner.put(tp, consumer); } else { final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION); @@ -1172,7 +1173,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { if (!currentAssignment.get(consumer).contains(topicPartition)) { String otherConsumer = allPartitions.get(topicPartition); int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size(); -if (consumerPartitionCount < otherConsumerPartitionCount) { +if (consumerPartitionCount + 1 < otherConsumerPartitionCount) { log.debug("{} can be moved from consumer {} to consumer {} for a more balanced assignment.", topicPartition, otherConsumer, consumer); return false; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index cdb0142c49b..71d188f3cd7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.clients.consumer.StickyAssignor; @@ -724,6 +725,91 @@ public abstract class AbstractStickyAssignorTest { assignor.assignPartitions(partitionsPerTopic, subscriptions); } +@Timeout(90) +@ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) +@ValueSource(booleans = {false, true}) +public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) { +initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK); +int topicCount = hasConsumerRack ? 50 : 100; +int partitionCount = 2_00; +int consumerCount = 5_00; + +List topics = new ArrayList<>(); +Map> partitionsPerTopic = new HashMap<>(); +for (int i = 0; i < topicCount; i++) { +String topicName = getTopicName(i, topicCount); +topics.add(topicName); +partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount)); +} +for (int i = 0; i < consumerCount; i++) { +if (i % 4 == 0) { +
[kafka] branch trunk updated: KAFKA-7438: Replace PowerMockRunner with MockitoJUnitRunner in RetryUtilTest (#14143)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b9936d6292f KAFKA-7438: Replace PowerMockRunner with MockitoJUnitRunner in RetryUtilTest (#14143) b9936d6292f is described below commit b9936d6292f3d7e76260b91e96520f94d5bc9bd7 Author: Yash Mayya AuthorDate: Thu Aug 3 18:07:35 2023 +0100 KAFKA-7438: Replace PowerMockRunner with MockitoJUnitRunner in RetryUtilTest (#14143) Reviewers: Chris Egerton --- .../main/java/org/apache/kafka/connect/util/RetryUtil.java | 2 +- .../java/org/apache/kafka/connect/util/RetryUtilTest.java| 12 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 174f9be0ab8..8babe4ebfc8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -80,7 +80,7 @@ public class RetryUtil { final long end = time.milliseconds() + timeoutMs; int attempt = 0; -Throwable lastError = null; +Throwable lastError; do { attempt++; try { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java index 08e2157c7be..b8884b5ada8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java @@ -16,10 +16,6 @@ */ package org.apache.kafka.connect.util; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.jupiter.api.Assertions.assertThrows; - import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.utils.MockTime; @@ -29,13 +25,17 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.junit.MockitoJUnitRunner; import java.time.Duration; import java.util.concurrent.Callable; import java.util.function.Supplier; -@RunWith(PowerMockRunner.class) +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class RetryUtilTest { private final Time mockTime = new MockTime(10);
[kafka] branch trunk updated: MINOR: Fix debug logs to display TimeIndexOffset (#13935)
This is an automated email from the ASF dual-hosted git repository. divijv pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7d39d7400c9 MINOR: Fix debug logs to display TimeIndexOffset (#13935) 7d39d7400c9 is described below commit 7d39d7400c919a519fb73d93e311eba9b13bbb97 Author: Divij Vaidya AuthorDate: Thu Aug 3 11:05:01 2023 +0200 MINOR: Fix debug logs to display TimeIndexOffset (#13935) Reviewers: Luke Chen --- .../java/org/apache/kafka/storage/internals/log/TimeIndex.java | 4 ++-- .../org/apache/kafka/storage/internals/log/TimestampOffset.java| 7 +++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java index f221cf4b0e0..e1ade8c5314 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java @@ -66,7 +66,7 @@ public class TimeIndex extends AbstractIndex { this.lastEntry = lastEntryFromIndexFile(); log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}", -file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry, mmap().position()); +file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry.offset, mmap().position()); } @Override @@ -278,7 +278,7 @@ public class TimeIndex extends AbstractIndex { super.truncateToEntries0(entries); this.lastEntry = lastEntryFromIndexFile(); log.debug("Truncated index {} to {} entries; position is now {} and last entry is now {}", -file().getAbsolutePath(), entries, mmap().position(), lastEntry); +file().getAbsolutePath(), entries, mmap().position(), lastEntry.offset); } finally { lock.unlock(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimestampOffset.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimestampOffset.java index 9b48352c213..9dc35ae32d4 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimestampOffset.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimestampOffset.java @@ -67,4 +67,11 @@ public class TimestampOffset implements IndexEntry { result = 31 * result + Long.hashCode(offset); return result; } + +@Override +public String toString() { +return String.format("TimestampOffset(offset = %d, timestamp = %d)", +offset, +timestamp); +} }
[kafka] branch trunk updated: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs (#14114)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d89b26ff443 KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs (#14114) d89b26ff443 is described below commit d89b26ff443c8dd5d584ad0a979ac3944366cc06 Author: Kamal Chandraprakash AuthorDate: Thu Aug 3 13:56:00 2023 +0530 KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs (#14114) KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs. Topic -> Broker Synonym: local.retention.bytes -> log.local.retention.bytes local.retention.ms -> log.local.retention.ms We cannot add synonym for `remote.storage.enable` topic property as it depends on KIP-950 Reviewers: Divij Vaidya , Satish Duggana , Luke Chen --- .../server/ReplicaFetcherTierStateMachine.java | 2 +- core/src/main/scala/kafka/log/UnifiedLog.scala | 2 +- .../scala/kafka/server/DynamicBrokerConfig.scala | 34 ++ core/src/main/scala/kafka/server/KafkaConfig.scala | 13 ++- .../test/scala/unit/kafka/log/LogConfigTest.scala | 39 +-- .../kafka/server/DynamicBrokerConfigTest.scala | 94 + .../scala/unit/kafka/server/KafkaConfigTest.scala | 28 - .../server/config/ServerTopicConfigSynonyms.java | 4 +- .../log/remote/storage/RemoteLogManagerConfig.java | 28 - .../kafka/storage/internals/log/LogConfig.java | 115 - 10 files changed, 292 insertions(+), 67 deletions(-) diff --git a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java index 21c99d11474..3f5a1b2c69f 100644 --- a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java +++ b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java @@ -180,7 +180,7 @@ public class ReplicaFetcherTierStateMachine implements TierStateMachine { long nextOffset; -if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteLogConfig.remoteStorageEnable) { +if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) { if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated"); RemoteLogManager rlm = replicaMgr.remoteLogManager().get(); diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 028f2fbbd5e..1b191844e77 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -168,7 +168,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, !(config.compact || Topic.isInternal(topicPartition.topic()) || TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topicPartition.topic()) || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topicPartition.topic())) && - config.remoteLogConfig.remoteStorageEnable + config.remoteStorageEnable() } /** diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 7c6d5284d71..57aaf396e85 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -37,6 +37,7 @@ import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable} import org.apache.kafka.common.security.authenticator.LoginManager import org.apache.kafka.common.utils.{ConfigUtils, Utils} import org.apache.kafka.server.config.ServerTopicConfigSynonyms +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig} import scala.annotation.nowarn @@ -675,6 +676,39 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok // For update of topic config overrides, only config names and types are validated // Names and types have already been validated. For consistency with topic config // validation, no additional validation is performed. + +def validateLogLocalRetentionMs(): Unit = { + val logRetentionMs = newConfig.logRetentionTimeMillis + val logLocalRetentionMs: java.lang.Long = newConfig.logLocalRetentionMs + if (logRetentionMs != -1L && logLocalRetentionMs != -2L) { +if (logLocalRetentionMs == -1L) { + throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs, +s"Value must not be -1 as ${KafkaConfig.LogRetentionTimeMillisProp} value is set as $logRetentionMs.") +} +if