This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c958d8719dc Revert "KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)" (#13527) c958d8719dc is described below commit c958d8719dc2588bd27958b54a65dea514808796 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Fri Apr 14 12:08:49 2023 -0700 Revert "KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)" (#13527) This reverts commit d9b139220ee253da673af44d58dc87bd184188f1. KIP-878 implementation did not make any progress, so we need to revert the public API changes which are not functional right now. Reviewers: Bill Bejeck <b...@confluent.io> --- .../org/apache/kafka/streams/StreamsConfig.java | 25 ----- .../assignment/AssignorConfiguration.java | 12 --- .../apache/kafka/streams/StreamsConfigTest.java | 16 ---- .../internals/assignment/AssignmentTestUtils.java | 88 ------------------ .../assignment/AssignorConfigurationTest.java | 2 +- .../ClientTagAwareStandbyTaskAssignorTest.java | 2 - .../assignment/FallbackPriorTaskAssignorTest.java | 2 +- .../HighAvailabilityTaskAssignorTest.java | 102 +++++++++++++-------- .../assignment/StandbyTaskAssignorFactoryTest.java | 4 - .../assignment/StickyTaskAssignorTest.java | 4 +- .../assignment/TaskAssignorConvergenceTest.java | 25 +++-- 11 files changed, 84 insertions(+), 198 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index bb0c519d380..6eb34dfd9dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -631,18 +631,6 @@ public class StreamsConfig extends AbstractConfig { public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads"; private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing."; - /** {@code partition.autoscaling.enabled} */ - @SuppressWarnings("WeakerAccess") - public static final String PARTITION_AUTOSCALING_ENABLED_CONFIG = "partition.autoscaling.enabled"; - private static final String PARTITION_AUTOSCALING_ENABLED_DOC = "Enable autoscaling the partitions of internal topics which are managed by Streams." - + " If an internal topic's partition count depends on an upstream input topic (or topics), then expanding the number of partitions on the input " - + "topic(s) will result in the internal topic(s) automatically being expanded to match."; - - /** {@code partition.autoscaling.timeout.ms} */ - @SuppressWarnings("WeakerAccess") - public static final String PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG = "partition.autoscaling.timeout.ms"; - private static final String PARTITION_AUTOSCALING_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds that Streams will attempt to retry autoscaling of internal topic partitions."; - /** {@code poll.ms} */ @SuppressWarnings("WeakerAccess") public static final String POLL_MS_CONFIG = "poll.ms"; @@ -1014,17 +1002,6 @@ public class StreamsConfig extends AbstractConfig { true, Importance.LOW, CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC) - .define(PARTITION_AUTOSCALING_ENABLED_CONFIG, - Type.BOOLEAN, - false, - Importance.LOW, - PARTITION_AUTOSCALING_ENABLED_DOC) - .define(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG, - Type.LONG, - 15 * 60 * 1000L, - atLeast(0), - Importance.LOW, - PARTITION_AUTOSCALING_TIMEOUT_MS_DOC) .define(POLL_MS_CONFIG, Type.LONG, 100L, @@ -1563,8 +1540,6 @@ public class StreamsConfig extends AbstractConfig { consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG)); consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG)); consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG)); - consumerProps.put(PARTITION_AUTOSCALING_ENABLED_CONFIG, getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG)); - consumerProps.put(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG, getLong(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG)); consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG)); consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName()); consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index 3825a33eb34..d40489eab29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -261,16 +261,10 @@ public final class AssignorConfiguration { void onAssignmentComplete(final boolean stable); } - /** - * NOTE: any StreamsConfig you add here MUST be passed in to the consumer via - * {@link StreamsConfig#getMainConsumerConfigs} - */ public static class AssignmentConfigs { public final long acceptableRecoveryLag; public final int maxWarmupReplicas; public final int numStandbyReplicas; - public final boolean partitionAutoscalingEnabled; - public final long partitionAutoscalingTimeoutMs; public final long probingRebalanceIntervalMs; public final List<String> rackAwareAssignmentTags; @@ -278,8 +272,6 @@ public final class AssignorConfiguration { acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG); numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); - partitionAutoscalingEnabled = configs.getBoolean(StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG); - partitionAutoscalingTimeoutMs = configs.getLong(StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG); probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); } @@ -287,15 +279,11 @@ public final class AssignorConfiguration { AssignmentConfigs(final Long acceptableRecoveryLag, final Integer maxWarmupReplicas, final Integer numStandbyReplicas, - final boolean partitionAutoscalingEnabled, - final long partitionAutoscalingTimeoutMs, final Long probingRebalanceIntervalMs, final List<String> rackAwareAssignmentTags) { this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag); this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas); this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas); - this.partitionAutoscalingEnabled = validated(StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG, partitionAutoscalingEnabled); - this.partitionAutoscalingTimeoutMs = validated(StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG, partitionAutoscalingTimeoutMs); this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs); this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags); } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 29c1be977c5..05582b74aee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -62,8 +62,6 @@ import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH; import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH; -import static org.apache.kafka.streams.StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG; import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG; import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; @@ -1371,20 +1369,6 @@ public class StreamsConfigTest { assertEquals(0, configs.size()); } - @Test - public void shouldEnablePartitionAutoscaling() { - props.put("partition.autoscaling.enabled", true); - final StreamsConfig config = new StreamsConfig(props); - assertTrue(config.getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG)); - } - - @Test - public void shouldSetPartitionAutoscalingTimeout() { - props.put("partition.autoscaling.timeout.ms", 0L); - final StreamsConfig config = new StreamsConfig(props); - assertThat(config.getLong(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG), is(0L)); - } - @Test public void shouldReturnDefaultClientSupplier() { final KafkaClientSupplier supplier = streamsConfig.getKafkaClientSupplier(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index 181bedab6f5..8993372dd5a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -113,93 +112,6 @@ public final class AssignmentTestUtils { private AssignmentTestUtils() {} - public static final long ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT = 100; - - public static AssignmentConfigs getDefaultConfigsWithZeroStandbys() { - return new AssignmentConfigs( - ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT, - 2, - 0, - false, - 90_000L, - 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS - ); - } - - public static AssignmentConfigs getDefaultConfigsWithOneStandbys() { - return new AssignmentConfigs( - ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT, - 2, - 1, - false, - 90_000L, - 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS - ); - } - - public static AssignmentConfigs getConfigsWithZeroStandbysAndWarmups(final int maxWarmups) { - return new AssignmentConfigs( - ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT, - maxWarmups, - 0, - false, - 90_000L, - 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS - ); - } - - public static AssignmentConfigs getConfigsWithOneStandbysAndWarmups(final int maxWarmups) { - return new AssignmentConfigs( - ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT, - maxWarmups, - 1, - false, - 90_000L, - 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS - ); - } - - public static AssignmentConfigs getConfigsWithOneStandbysAndZeroLagAndWarmups(final int maxWarmups) { - return new AssignmentConfigs( - 0L, - maxWarmups, - 1, - false, - 90_000L, - 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS - ); - } - - public static AssignmentConfigs getConfigsWithZeroStandbysAndZeroLagAndWarmups(final int maxWarmups) { - return new AssignmentConfigs( - 0L, - maxWarmups, - 0, - false, - 90_000L, - 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS - ); - } - - public static AssignmentConfigs getConfigsWithOneStandbysAndLagAndWarmups(final long acceptableRecoveryLag, - final int maxWarmups) { - return new AssignmentConfigs( - acceptableRecoveryLag, - maxWarmups, - 1, - false, - 90_000L, - 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS - ); - } - static Map<UUID, ClientState> getClientStatesMap(final ClientState... states) { final Map<UUID, ClientState> clientStates = new HashMap<>(); int nthState = 1; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java index ede16ce6372..9ff53b54244 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java @@ -30,7 +30,7 @@ public class AssignorConfigurationTest { public void configsShouldRejectZeroWarmups() { final ConfigException exception = assertThrows( ConfigException.class, - () -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, false, 1L, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) + () -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(exception.getMessage(), containsString("Invalid value 0 for configuration max.warmup.replicas")); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java index 2b87dc09064..631430c6a82 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java @@ -656,8 +656,6 @@ public class ClientTagAwareStandbyTaskAssignorTest { return new AssignmentConfigs(0L, 1, numStandbyReplicas, - false, - 1L, 60000L, asList(rackAwareAssignmentTags)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java index a43eb5fb857..0473d9bee45 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java @@ -54,7 +54,7 @@ public class FallbackPriorTaskAssignorTest { clients, new HashSet<>(taskIds), new HashSet<>(taskIds), - new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, false, 1L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) + new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(probingRebalanceNeeded, is(true)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java index 708f6e3afb0..90e0fed51f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.junit.Test; import java.util.HashMap; @@ -33,6 +34,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1; @@ -54,12 +56,6 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithOneStandbysAndLagAndWarmups; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithOneStandbysAndWarmups; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithZeroStandbysAndWarmups; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithZeroStandbysAndZeroLagAndWarmups; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithOneStandbys; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithZeroStandbys; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks; @@ -71,7 +67,23 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.fail; -public class HighAvailabilityTaskAssignorTest { +public class HighAvailabilityTaskAssignorTest { + private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs( + /*acceptableRecoveryLag*/ 100L, + /*maxWarmupReplicas*/ 2, + /*numStandbyReplicas*/ 0, + /*probingRebalanceIntervalMs*/ 60 * 1000L, + /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS + ); + + private final AssignmentConfigs configWithStandbys = new AssignmentConfigs( + /*acceptableRecoveryLag*/ 100L, + /*maxWarmupReplicas*/ 2, + /*numStandbyReplicas*/ 1, + /*probingRebalanceIntervalMs*/ 60 * 1000L, + /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS + ); + @Test public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() { final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2); @@ -89,7 +101,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTaskIds, allTaskIds, - getConfigsWithOneStandbysAndLagAndWarmups(11L, 2) + new AssignmentConfigs(11L, 2, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(clientState1, hasAssignedTasks(allTaskIds.size())); @@ -118,7 +130,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTaskIds, allTaskIds, - getConfigsWithOneStandbysAndLagAndWarmups(Long.MAX_VALUE, 1) + new AssignmentConfigs(Long.MAX_VALUE, 1, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(clientState1, hasAssignedTasks(6)); @@ -139,7 +151,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTaskIds, allTaskIds, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(1) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(unstable, is(false)); assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder()); @@ -160,7 +172,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTaskIds, allTaskIds, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(1) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(unstable, is(false)); assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder()); @@ -180,7 +192,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTaskIds, allTaskIds, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(1) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(unstable, is(false)); @@ -202,7 +214,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTaskIds, allTaskIds, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(1) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(unstable, is(false)); @@ -231,7 +243,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTaskIds, allTaskIds, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(1) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(unstable, is(false)); @@ -253,7 +265,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTaskIds, allTaskIds, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(1) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(unstable, is(false)); @@ -278,7 +290,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTaskIds, allTaskIds, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(allTaskIds.size() / 3 + 1) + new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(unstable, is(true)); @@ -308,7 +320,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTaskIds, allTaskIds, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(allTaskIds.size() / 3 + 1) + new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(unstable, is(false)); @@ -329,7 +341,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTaskIds, allTaskIds, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(1) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(unstable, is(false)); @@ -349,7 +361,7 @@ public class HighAvailabilityTaskAssignorTest { final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, singleton(TASK_0_0), - getDefaultConfigsWithZeroStandbys()); + configWithoutStandbys); assertThat(probingRebalanceNeeded, is(false)); assertThat(client1, hasActiveTasks(2)); @@ -372,7 +384,7 @@ public class HighAvailabilityTaskAssignorTest { final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, - getDefaultConfigsWithOneStandbys()); + configWithStandbys); assertThat(clientStates.get(UUID_2).standbyTasks(), not(empty())); assertThat(probingRebalanceNeeded, is(false)); @@ -394,7 +406,7 @@ public class HighAvailabilityTaskAssignorTest { ); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys); assertThat(clientStates.get(UUID_1).activeTasks(), is(singleton(TASK_0_1))); assertThat(clientStates.get(UUID_2).activeTasks(), is(singleton(TASK_0_0))); @@ -421,7 +433,7 @@ public class HighAvailabilityTaskAssignorTest { ); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys); assertThat(clientStates.get(UUID_1).activeTasks(), is(emptySet())); assertThat(clientStates.get(UUID_2).activeTasks(), is(emptySet())); @@ -448,7 +460,7 @@ public class HighAvailabilityTaskAssignorTest { final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys); assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0))); @@ -468,7 +480,7 @@ public class HighAvailabilityTaskAssignorTest { final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys); assertThat(client1.activeTaskCount(), equalTo(1)); @@ -486,7 +498,7 @@ public class HighAvailabilityTaskAssignorTest { final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys); assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1))); @@ -509,7 +521,13 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTasks, statefulTasks, - getConfigsWithZeroStandbysAndWarmups(1) + new AssignmentConfigs( + /*acceptableRecoveryLag*/ 100L, + /*maxWarmupReplicas*/ 1, + /*numStandbyReplicas*/ 0, + /*probingRebalanceIntervalMs*/ 60 * 1000L, + /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS + ) ); @@ -532,7 +550,13 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTasks, statefulTasks, - getConfigsWithOneStandbysAndWarmups(1) + new AssignmentConfigs( + /*acceptableRecoveryLag*/ 100L, + /*maxWarmupReplicas*/ 1, + /*numStandbyReplicas*/ 1, + /*probingRebalanceIntervalMs*/ 60 * 1000L, + /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS + ) ); assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3))); @@ -550,7 +574,7 @@ public class HighAvailabilityTaskAssignorTest { final Map<UUID, ClientState> clientStates = getClientStatesMap(client1); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys); assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1))); assertHasNoStandbyTasks(client1); @@ -566,7 +590,7 @@ public class HighAvailabilityTaskAssignorTest { final Map<UUID, ClientState> clientStates = getClientStatesMap(client1); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys); assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1))); assertHasNoStandbyTasks(client1); assertThat(probingRebalanceNeeded, is(false)); @@ -583,7 +607,7 @@ public class HighAvailabilityTaskAssignorTest { final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys); assertValidAssignment( 1, @@ -608,7 +632,7 @@ public class HighAvailabilityTaskAssignorTest { final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys); assertValidAssignment( 1, 2, @@ -641,7 +665,7 @@ public class HighAvailabilityTaskAssignorTest { final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys); assertThat(client1.activeTasks(), not(empty())); assertThat(client2.activeTasks(), not(empty())); @@ -658,7 +682,7 @@ public class HighAvailabilityTaskAssignorTest { final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys); assertThat(probingRebalanceNeeded, is(false)); assertThat(client1.activeTasks(), equalTo(client1.prevActiveTasks())); @@ -674,7 +698,7 @@ public class HighAvailabilityTaskAssignorTest { final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys); assertThat(probingRebalanceNeeded, is(false)); assertHasNoStandbyTasks(client1, client2); } @@ -688,7 +712,7 @@ public class HighAvailabilityTaskAssignorTest { final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2); final boolean probingRebalanceNeeded = - new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys()); + new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys); assertThat(probingRebalanceNeeded, is(true)); assertThat(client2.standbyTaskCount(), equalTo(1)); } @@ -710,7 +734,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTasks, statefulTasks, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(1) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertValidAssignment( @@ -741,7 +765,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTasks, statefulTasks, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(1) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertValidAssignment( @@ -772,7 +796,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTasks, statefulTasks, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(1) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertValidAssignment( @@ -803,7 +827,7 @@ public class HighAvailabilityTaskAssignorTest { clientStates, allTasks, statefulTasks, - getConfigsWithZeroStandbysAndZeroLagAndWarmups(1) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertValidAssignment( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java index 0c22cd290e7..fdd7fa1d473 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java @@ -28,8 +28,6 @@ public class StandbyTaskAssignorFactoryTest { private static final long ACCEPTABLE_RECOVERY_LAG = 0L; private static final int MAX_WARMUP_REPLICAS = 1; private static final int NUMBER_OF_STANDBY_REPLICAS = 1; - private static final boolean PARTITION_AUTOSCALING_ENABLED = false; - private static final long PARTITION_AUTOSCALING_TIMEOUT_MS = 90000L; private static final long PROBING_REBALANCE_INTERVAL_MS = 60000L; @Test @@ -48,8 +46,6 @@ public class StandbyTaskAssignorFactoryTest { return new AssignorConfiguration.AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG, MAX_WARMUP_REPLICAS, NUMBER_OF_STANDBY_REPLICAS, - PARTITION_AUTOSCALING_ENABLED, - PARTITION_AUTOSCALING_TIMEOUT_MS, PROBING_REBALANCE_INTERVAL_MS, rackAwareAssignmentTags); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 82e3de4baf7..8c1347f22d9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -677,7 +677,7 @@ public class StickyTaskAssignorTest { clients, new HashSet<>(taskIds), new HashSet<>(taskIds), - new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, false, 90_000L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) + new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(probingRebalanceNeeded, is(false)); @@ -696,7 +696,7 @@ public class StickyTaskAssignorTest { clients, new HashSet<>(taskIds), new HashSet<>(taskIds), - new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, false, 90_000L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) + new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java index 2b1564fa1a1..c1be5f33fa2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java @@ -29,13 +29,11 @@ import java.util.TreeSet; import java.util.UUID; import java.util.function.Supplier; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.appendClientStates; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedActiveAssignment; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedStatefulAssignment; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithZeroStandbys; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt; import static org.junit.Assert.fail; @@ -231,7 +229,11 @@ public class TaskAssignorConvergenceTest { @Test public void staticAssignmentShouldConvergeWithTheFirstAssignment() { - final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys(); + final AssignmentConfigs configs = new AssignmentConfigs(100L, + 2, + 0, + 60_000L, + EMPTY_RACK_AWARE_ASSIGNMENT_TAGS); final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1); @@ -247,7 +249,11 @@ public class TaskAssignorConvergenceTest { final int maxWarmupReplicas = 2; final int numStandbyReplicas = 0; - final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys(); + final AssignmentConfigs configs = new AssignmentConfigs(100L, + maxWarmupReplicas, + numStandbyReplicas, + 60_000L, + EMPTY_RACK_AWARE_ASSIGNMENT_TAGS); final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 1, () -> 5); testForConvergence(harness, configs, 1); @@ -266,7 +272,11 @@ public class TaskAssignorConvergenceTest { final int maxWarmupReplicas = 2; final int numStandbyReplicas = 0; - final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys(); + final AssignmentConfigs configs = new AssignmentConfigs(100L, + maxWarmupReplicas, + numStandbyReplicas, + 60_000L, + EMPTY_RACK_AWARE_ASSIGNMENT_TAGS); final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 7, () -> 5); testForConvergence(harness, configs, 1); @@ -304,11 +314,9 @@ public class TaskAssignorConvergenceTest { final int numberOfEvents = prng.nextInt(10) + 1; - final AssignmentConfigs configs = new AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT, + final AssignmentConfigs configs = new AssignmentConfigs(100L, maxWarmupReplicas, numStandbyReplicas, - false, - 90_000L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS); @@ -419,4 +427,5 @@ public class TaskAssignorConvergenceTest { } } + }