cadonna commented on code in PR #17371:
URL: https://github.com/apache/kafka/pull/17371#discussion_r1808685058


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import 
org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@SuppressWarnings("OptionalGetWithoutIsPresent")
+public class CopartitionedTopicsEnforcerTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+
+    private static Integer emptyTopicPartitionProvider(String topic) {
+        return null;
+    }
+
+    private static Integer firstSecondTopicConsistent(String topic) {
+        if (topic.equals("first") || topic.equals("second")) return 2;
+        return null;
+    }
+
+    private static Integer firstSecondTopicInconsistent(String topic) {
+        if (topic.equals("first")) return 2;
+        if (topic.equals("second")) return 1;
+        return null;
+    }
+
+    @Test
+    public void 
shouldThrowStreamsInconsistentInternalTopicsExceptionIfNoPartitionsFoundForCoPartitionedTopic()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT, 
CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider);
+        assertThrows(StreamsInvalidTopologyException.class, () -> 
validator.enforce(Collections.singleton("topic"),
+            Collections.emptyMap()));
+    }
+
+    @Test
+    public void 
shouldThrowStreamsInconsistentInternalTopicsExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT, 
CopartitionedTopicsEnforcerTest::firstSecondTopicInconsistent);
+        assertThrows(StreamsInconsistentInternalTopicsException.class, () -> 
validator.enforce(Utils.mkSet("first", "second"),

Review Comment:
   `Utils.mkSet()` was removed from the code base since Java 11 provides 
`Set.of()`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import 
org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@SuppressWarnings("OptionalGetWithoutIsPresent")
+public class CopartitionedTopicsEnforcerTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+
+    private static Integer emptyTopicPartitionProvider(String topic) {
+        return null;
+    }
+
+    private static Integer firstSecondTopicConsistent(String topic) {
+        if (topic.equals("first") || topic.equals("second")) return 2;
+        return null;
+    }
+
+    private static Integer firstSecondTopicInconsistent(String topic) {
+        if (topic.equals("first")) return 2;
+        if (topic.equals("second")) return 1;
+        return null;
+    }
+
+    @Test
+    public void 
shouldThrowStreamsInconsistentInternalTopicsExceptionIfNoPartitionsFoundForCoPartitionedTopic()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT, 
CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider);
+        assertThrows(StreamsInvalidTopologyException.class, () -> 
validator.enforce(Collections.singleton("topic"),
+            Collections.emptyMap()));
+    }
+
+    @Test
+    public void 
shouldThrowStreamsInconsistentInternalTopicsExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT, 
CopartitionedTopicsEnforcerTest::firstSecondTopicInconsistent);
+        assertThrows(StreamsInconsistentInternalTopicsException.class, () -> 
validator.enforce(Utils.mkSet("first", "second"),
+            Collections.emptyMap()));
+    }
+
+
+    @Test
+    public void shouldEnforceCopartitioningOnRepartitionTopics() {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT, 
CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent);
+        final ConfiguredInternalTopic config = 
createTopicConfig("repartitioned", 10);
+
+        validator.enforce(Utils.mkSet("first", "second", config.name()),
+            Collections.singletonMap(config.name(), config));
+
+        assertEquals(Optional.of(2), config.numberOfPartitions());
+    }
+
+
+    @Test
+    public void 
shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT, 
CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider);
+        final ConfiguredInternalTopic one = createTopicConfig("one", 1);
+        final ConfiguredInternalTopic two = createTopicConfig("two", 15);
+        final ConfiguredInternalTopic three = createTopicConfig("three", 5);
+        final Map<String, ConfiguredInternalTopic> internalTopicConfigs = new 
HashMap<>();
+
+        internalTopicConfigs.put(one.name(), one);
+        internalTopicConfigs.put(two.name(), two);
+        internalTopicConfigs.put(three.name(), three);
+
+        validator.enforce(Utils.mkSet(one.name(),
+                two.name(),
+                three.name()),

Review Comment:
   nit for readability:
   ```suggestion
           validator.enforce(Utils.mkSet(
               one.name(),
               two.name(),
               three.name()
           ),
   ```
   



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import 
org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for enforcing the number of partitions in 
copartitioned topics.
+ */
+public class CopartitionedTopicsEnforcer {
+
+    private final Logger log;
+    private final Function<String, Integer> topicPartitionCountProvider;
+
+    public CopartitionedTopicsEnforcer(final LogContext logContext,
+                                       final Function<String, Integer> 
topicPartitionCountProvider) {
+        this.log = logContext.logger(getClass());
+        this.topicPartitionCountProvider = topicPartitionCountProvider;
+    }
+
+    private static void maybeSetNumberOfPartitionsForInternalTopic(final int 
numPartitionsToUseForRepartitionTopics,
+                                                                   final 
ConfiguredInternalTopic config) {
+        if (!config.hasEnforcedNumberOfPartitions()) {
+            
config.setNumberOfPartitions(numPartitionsToUseForRepartitionTopics);
+        }
+    }
+
+    private static Supplier<StreamsInvalidTopologyException> 
emptyNumberOfPartitionsExceptionSupplier(final String topic) {
+        return () -> new StreamsInvalidTopologyException("Number of partitions 
is not set for topic: " + topic);
+    }
+
+    /**
+     * Enforces the number of partitions for copartitioned topics.
+     *
+     * @param copartitionedTopics the set of copartitioned topics
+     * @param repartitionTopics   a map from repartition topics to their 
internal topic configs
+     */
+    public void enforce(final Set<String> copartitionedTopics,
+                        final Map<String, ConfiguredInternalTopic> 
repartitionTopics) {
+        if (copartitionedTopics.isEmpty()) {
+            return;
+        }
+
+        final Map<Object, ConfiguredInternalTopic> repartitionTopicConfigs =
+            copartitionedTopics.stream()
+                .filter(repartitionTopics::containsKey)
+                .collect(
+                    Collectors.toMap(topic -> topic, repartitionTopics::get));

Review Comment:
   nit:
   ```suggestion
                   .collect(Collectors.toMap(topic -> topic, 
repartitionTopics::get));
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import 
org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@SuppressWarnings("OptionalGetWithoutIsPresent")
+public class CopartitionedTopicsEnforcerTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+
+    private static Integer emptyTopicPartitionProvider(String topic) {
+        return null;
+    }
+
+    private static Integer firstSecondTopicConsistent(String topic) {
+        if (topic.equals("first") || topic.equals("second")) return 2;
+        return null;
+    }
+
+    private static Integer firstSecondTopicInconsistent(String topic) {
+        if (topic.equals("first")) return 2;
+        if (topic.equals("second")) return 1;
+        return null;
+    }
+
+    @Test
+    public void 
shouldThrowStreamsInconsistentInternalTopicsExceptionIfNoPartitionsFoundForCoPartitionedTopic()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT, 
CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider);
+        assertThrows(StreamsInvalidTopologyException.class, () -> 
validator.enforce(Collections.singleton("topic"),
+            Collections.emptyMap()));
+    }
+
+    @Test
+    public void 
shouldThrowStreamsInconsistentInternalTopicsExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT, 
CopartitionedTopicsEnforcerTest::firstSecondTopicInconsistent);
+        assertThrows(StreamsInconsistentInternalTopicsException.class, () -> 
validator.enforce(Utils.mkSet("first", "second"),
+            Collections.emptyMap()));
+    }
+
+
+    @Test
+    public void shouldEnforceCopartitioningOnRepartitionTopics() {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT, 
CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent);
+        final ConfiguredInternalTopic config = 
createTopicConfig("repartitioned", 10);
+
+        validator.enforce(Utils.mkSet("first", "second", config.name()),
+            Collections.singletonMap(config.name(), config));
+
+        assertEquals(Optional.of(2), config.numberOfPartitions());
+    }
+
+
+    @Test
+    public void 
shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT, 
CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider);
+        final ConfiguredInternalTopic one = createTopicConfig("one", 1);
+        final ConfiguredInternalTopic two = createTopicConfig("two", 15);
+        final ConfiguredInternalTopic three = createTopicConfig("three", 5);
+        final Map<String, ConfiguredInternalTopic> internalTopicConfigs = new 
HashMap<>();
+
+        internalTopicConfigs.put(one.name(), one);
+        internalTopicConfigs.put(two.name(), two);
+        internalTopicConfigs.put(three.name(), three);
+
+        validator.enforce(Utils.mkSet(one.name(),
+                two.name(),
+                three.name()),
+            internalTopicConfigs
+        );
+
+        assertEquals(Optional.of(15), one.numberOfPartitions());
+        assertEquals(Optional.of(15), two.numberOfPartitions());
+        assertEquals(Optional.of(15), three.numberOfPartitions());
+    }
+
+    @Test
+    public void 
shouldThrowAnExceptionIfInternalTopicConfigsWithEnforcedNumOfPartitionsHaveDifferentNumOfPartitions()
 {
+        final CopartitionedTopicsEnforcer validator = new 
CopartitionedTopicsEnforcer(LOG_CONTEXT, 
CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent);
+        final ConfiguredInternalTopic topic1 = 
createInternalTopicConfigWithEnforcedNumberOfPartitions("repartitioned-1", 10);
+        final ConfiguredInternalTopic topic2 = 
createInternalTopicConfigWithEnforcedNumberOfPartitions("repartitioned-2", 5);
+
+        final StreamsInconsistentInternalTopicsException ex = assertThrows(
+            StreamsInconsistentInternalTopicsException.class,
+            () -> validator.enforce(Utils.mkSet(topic1.name(), topic2.name()),
+                Utils.mkMap(Utils.mkEntry(topic1.name(), topic1),
+                    Utils.mkEntry(topic2.name(), topic2)))

Review Comment:
   nit for readability:
   ```suggestion
                   Utils.mkMap(
                       Utils.mkEntry(topic1.name(), topic1),
                       Utils.mkEntry(topic2.name(), topic2)
                   )
               )
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.internals.Topic;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * ConfiguredInternalTopic captures the properties required for configuring
+ * the internal topics we create for change-logs and repartitioning etc.
+ *
+ * It is derived from the topology sent by the client, and the current state
+ * of the topics inside the broker. If the topics on the broker changes, the
+ * internal topic may need to be reconfigured.
+ */
+public class ConfiguredInternalTopic {
+
+    private final String name;
+    private final Map<String, String> topicConfigs;
+    private final Optional<Short> replicationFactor;
+    private final boolean enforceNumberOfPartitions;
+    private Optional<Integer> numberOfPartitions;
+
+    public ConfiguredInternalTopic(final String name) {
+        this(name, Collections.emptyMap(), Optional.empty(), Optional.empty());
+    }
+
+    public ConfiguredInternalTopic(final String name,
+                                   final Map<String, String> topicConfigs) {
+        this(name, topicConfigs, Optional.empty(), Optional.empty());
+    }
+
+    public ConfiguredInternalTopic(
+        final String name,
+        final Map<String, String> topicConfigs,
+        final Optional<Integer> numberOfPartitions,
+        final Optional<Short> replicationFactor) {

Review Comment:
   Also here and elsewhere:
   ```suggestion
       public ConfiguredInternalTopic(final String name,
                                      final Map<String, String> topicConfigs,
                                      final Optional<Integer> 
numberOfPartitions,
                                      final Optional<Short> replicationFactor) {
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class InternalTopicManager {
+
+    public static Map<String, CreatableTopic> missingTopics(Map<String, 
ConfiguredSubtopology> subtopologyMap,
+                                                            MetadataImage 
metadataImage) {
+
+        final Map<String, CreatableTopic> topicsToCreate = new HashMap<>();
+        for (ConfiguredSubtopology subtopology : subtopologyMap.values()) {
+            subtopology.repartitionSourceTopics().values()
+                .forEach(x -> topicsToCreate.put(x.name(), 
toCreatableTopic(x)));
+            subtopology.stateChangelogTopics().values()
+                .forEach(x -> topicsToCreate.put(x.name(), 
toCreatableTopic(x)));
+        }
+        // TODO: Validate if existing topics are compatible with the new topics
+        for (String topic : metadataImage.topics().topicsByName().keySet()) {
+            topicsToCreate.remove(topic);
+        }
+        return topicsToCreate;
+    }
+
+
+    public static Map<String, ConfiguredSubtopology> 
configureTopics(LogContext logContext,
+                                                          
List<StreamsGroupTopologyValue.Subtopology> subtopologyList,
+                                                          MetadataImage 
metadataImage) {
+
+        final Logger log = logContext.logger(InternalTopicManager.class);
+
+        final Map<String, ConfiguredSubtopology> configuredSubtopologies =
+            subtopologyList.stream()
+                .collect(Collectors.toMap(
+                    StreamsGroupTopologyValue.Subtopology::subtopologyId,
+                    InternalTopicManager::fromPersistedSubtopology)
+                );
+
+        final Map<String, Collection<Set<String>>> 
copartitionGroupsBySubtopology =
+            subtopologyList.stream()
+                .collect(Collectors.toMap(
+                    StreamsGroupTopologyValue.Subtopology::subtopologyId,
+                    
InternalTopicManager::copartitionGroupsFromPersistedSubtopology)
+                );
+
+        final Map<String, ConfiguredInternalTopic> configuredInternalTopics =
+            configuredSubtopologies.values().stream().flatMap(x ->
+                Stream.concat(
+                    x.repartitionSourceTopics().values().stream(),
+                    x.stateChangelogTopics().values().stream()
+                )
+            ).collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x));
+
+        final Function<String, Integer> topicPartitionCountProvider =
+            x -> getPartitionCount(metadataImage, x, configuredInternalTopics);

Review Comment:
   Could you please use `topic` instead of `x` to make the code better 
readable? 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.function.Function;
+
+/**
+ * This class is responsible for setting up the changelog topics for a 
topology.
+ */
+public class ChangelogTopics {
+
+    private final Map<String, ConfiguredSubtopology> subtopologies;
+    private final Function<String, Integer> topicPartitionCountProvider;
+    private final Logger log;
+
+    public ChangelogTopics(
+        final LogContext logContext,
+        final Map<String, ConfiguredSubtopology> subtopologies,
+        final Function<String, Integer> topicPartitionCountProvider) {
+        this.log = logContext.logger(getClass());
+        this.subtopologies = subtopologies;
+        this.topicPartitionCountProvider = topicPartitionCountProvider;
+    }

Review Comment:
   Could you please change the formatting to the following? It it really hard 
to read if there is not enough visual separation between code and parameters.
   ```suggestion
       public ChangelogTopics(final LogContext logContext,
                              final Map<String, ConfiguredSubtopology> 
subtopologies,
                              final Function<String, Integer> 
topicPartitionCountProvider) {
           this.log = logContext.logger(getClass());
           this.subtopologies = subtopologies;
           this.topicPartitionCountProvider = topicPartitionCountProvider;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to