cadonna commented on a change in pull request #11837: URL: https://github.com/apache/kafka/pull/11837#discussion_r825779088
########## File path: clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java ########## @@ -722,4 +723,24 @@ public void testNiceTimeUnits() { assertEquals(" (365 days)", ConfigDef.niceTimeUnits(Duration.ofDays(365).toMillis())); } + @Test + public void testThrowsExceptionWhenListSizeExceedsLimit() { + assertThrows(ConfigException.class, () -> new ConfigDef().define("lst", + Type.LIST, + asList("a", "b"), + ListSize.max(1), + Importance.HIGH, + "lst doc")); + } + + @Test + public void testNoExceptionIsThrownWhenListSizeIsWithinTheLimit() { + new ConfigDef().define("lst", + Type.LIST, + asList("a", "b"), + ListSize.max(2), + Importance.HIGH, + "lst doc"); + } + Review comment: Could you add a unit test, so that there is one with a list that is longer than the maximum, one with a list that is strictly shorter than maximum, and one with list that has maximum length? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.ThreadMetadata; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; + +@Category({IntegrationTest.class}) +public class RackAwarenessIntegrationTest { + private static final int NUM_BROKERS = 1; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + @Rule + public TestName testName = new TestName(); + + private static final String INPUT_TOPIC = "input-topic"; + private static final String TAG_ZONE = "zone"; + private static final String TAG_CLUSTER = "cluster"; + + private List<KafkaStreamsWithConfiguration> kafkaStreamsInstances; + private Properties baseConfiguration; + + @BeforeClass + public static void createTopics() throws Exception { + CLUSTER.start(); + CLUSTER.createTopic(INPUT_TOPIC, 6, 1); + } + + @Before + public void setup() { + kafkaStreamsInstances = new ArrayList<>(); + baseConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + final String applicationId = "app-" + safeTestName; + baseConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + baseConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + baseConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + baseConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + baseConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + } + + @After + public void cleanup() throws IOException { + for (final KafkaStreamsWithConfiguration kafkaStreamsWithConfiguration : kafkaStreamsInstances) { + kafkaStreamsWithConfiguration.kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT)); + IntegrationTestUtils.purgeLocalStreamsState(kafkaStreamsWithConfiguration.configuration); + } + kafkaStreamsInstances.clear(); + } + + @Test + public void shouldDistributeStandbyReplicasBasedOnClientTags() throws Exception { + final Topology topology = createStatefulTopology(); + final int numberOfStandbyReplicas = 1; + + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + waitUntilRackAwareTaskDistributionIsReached(TAG_ZONE); + } + + @Test + public void shouldDistributeStandbyReplicasOverMultipleClientTags() throws Exception { + final Topology topology = createStatefulTopology(); + final int numberOfStandbyReplicas = 2; + + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1c", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-2"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-2"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1c", "k8s-cluster-2"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-3"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-3"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1c", "k8s-cluster-3"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + Review comment: I think, here it makes sense to wait until all Streams clients are `RUNNING` so that we know that the rebalance is done. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.ThreadMetadata; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; + +@Category({IntegrationTest.class}) +public class RackAwarenessIntegrationTest { + private static final int NUM_BROKERS = 1; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + @Rule + public TestName testName = new TestName(); + + private static final String INPUT_TOPIC = "input-topic"; + private static final String TAG_ZONE = "zone"; + private static final String TAG_CLUSTER = "cluster"; + + private List<KafkaStreamsWithConfiguration> kafkaStreamsInstances; + private Properties baseConfiguration; + + @BeforeClass + public static void createTopics() throws Exception { + CLUSTER.start(); + CLUSTER.createTopic(INPUT_TOPIC, 6, 1); + } + + @Before + public void setup() { + kafkaStreamsInstances = new ArrayList<>(); + baseConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + final String applicationId = "app-" + safeTestName; + baseConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + baseConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + baseConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + baseConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + baseConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + } + + @After + public void cleanup() throws IOException { + for (final KafkaStreamsWithConfiguration kafkaStreamsWithConfiguration : kafkaStreamsInstances) { + kafkaStreamsWithConfiguration.kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT)); + IntegrationTestUtils.purgeLocalStreamsState(kafkaStreamsWithConfiguration.configuration); + } + kafkaStreamsInstances.clear(); + } + + @Test + public void shouldDistributeStandbyReplicasBasedOnClientTags() throws Exception { + final Topology topology = createStatefulTopology(); + final int numberOfStandbyReplicas = 1; + + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + Review comment: See my comment below about waiting for `RUNNING`. ########## File path: clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ########## @@ -1121,6 +1121,27 @@ public String toString() { } } + public static class ListSize implements Validator { + final int maxSize; + + private ListSize(final int maxSize) { + this.maxSize = maxSize; + } + + public static ListSize max(final int maxSize) { Review comment: Could you rename this method to somthing a bit more specific like `atMostOfLength()`? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.ThreadMetadata; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; + +@Category({IntegrationTest.class}) +public class RackAwarenessIntegrationTest { + private static final int NUM_BROKERS = 1; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + @Rule + public TestName testName = new TestName(); + + private static final String INPUT_TOPIC = "input-topic"; + private static final String TAG_ZONE = "zone"; + private static final String TAG_CLUSTER = "cluster"; + + private List<KafkaStreamsWithConfiguration> kafkaStreamsInstances; + private Properties baseConfiguration; + + @BeforeClass + public static void createTopics() throws Exception { + CLUSTER.start(); + CLUSTER.createTopic(INPUT_TOPIC, 6, 1); + } + + @Before + public void setup() { + kafkaStreamsInstances = new ArrayList<>(); + baseConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + final String applicationId = "app-" + safeTestName; + baseConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + baseConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + baseConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + baseConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + baseConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + } + + @After + public void cleanup() throws IOException { + for (final KafkaStreamsWithConfiguration kafkaStreamsWithConfiguration : kafkaStreamsInstances) { + kafkaStreamsWithConfiguration.kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT)); + IntegrationTestUtils.purgeLocalStreamsState(kafkaStreamsWithConfiguration.configuration); + } + kafkaStreamsInstances.clear(); + } + + @Test + public void shouldDistributeStandbyReplicasBasedOnClientTags() throws Exception { + final Topology topology = createStatefulTopology(); + final int numberOfStandbyReplicas = 1; + + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + waitUntilRackAwareTaskDistributionIsReached(TAG_ZONE); + } + + @Test + public void shouldDistributeStandbyReplicasOverMultipleClientTags() throws Exception { + final Topology topology = createStatefulTopology(); + final int numberOfStandbyReplicas = 2; + + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1c", "k8s-cluster-1"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-2"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-2"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1c", "k8s-cluster-2"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + createAndStart(topology, buildClientTags("eu-central-1a", "k8s-cluster-3"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1b", "k8s-cluster-3"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(topology, buildClientTags("eu-central-1c", "k8s-cluster-3"), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + + waitUntilRackAwareTaskDistributionIsReached(TAG_ZONE, TAG_CLUSTER); + } + Review comment: Could you add a test where standbys cannot be distributed over all tags to check that task assignment works in that case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org