cadonna commented on code in PR #17371: URL: https://github.com/apache/kafka/pull/17371#discussion_r1808685058
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SuppressWarnings("OptionalGetWithoutIsPresent") +public class CopartitionedTopicsEnforcerTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + + private static Integer emptyTopicPartitionProvider(String topic) { + return null; + } + + private static Integer firstSecondTopicConsistent(String topic) { + if (topic.equals("first") || topic.equals("second")) return 2; + return null; + } + + private static Integer firstSecondTopicInconsistent(String topic) { + if (topic.equals("first")) return 2; + if (topic.equals("second")) return 1; + return null; + } + + @Test + public void shouldThrowStreamsInconsistentInternalTopicsExceptionIfNoPartitionsFoundForCoPartitionedTopic() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider); + assertThrows(StreamsInvalidTopologyException.class, () -> validator.enforce(Collections.singleton("topic"), + Collections.emptyMap())); + } + + @Test + public void shouldThrowStreamsInconsistentInternalTopicsExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, CopartitionedTopicsEnforcerTest::firstSecondTopicInconsistent); + assertThrows(StreamsInconsistentInternalTopicsException.class, () -> validator.enforce(Utils.mkSet("first", "second"), Review Comment: `Utils.mkSet()` was removed from the code base since Java 11 provides `Set.of()`. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SuppressWarnings("OptionalGetWithoutIsPresent") +public class CopartitionedTopicsEnforcerTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + + private static Integer emptyTopicPartitionProvider(String topic) { + return null; + } + + private static Integer firstSecondTopicConsistent(String topic) { + if (topic.equals("first") || topic.equals("second")) return 2; + return null; + } + + private static Integer firstSecondTopicInconsistent(String topic) { + if (topic.equals("first")) return 2; + if (topic.equals("second")) return 1; + return null; + } + + @Test + public void shouldThrowStreamsInconsistentInternalTopicsExceptionIfNoPartitionsFoundForCoPartitionedTopic() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider); + assertThrows(StreamsInvalidTopologyException.class, () -> validator.enforce(Collections.singleton("topic"), + Collections.emptyMap())); + } + + @Test + public void shouldThrowStreamsInconsistentInternalTopicsExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, CopartitionedTopicsEnforcerTest::firstSecondTopicInconsistent); + assertThrows(StreamsInconsistentInternalTopicsException.class, () -> validator.enforce(Utils.mkSet("first", "second"), + Collections.emptyMap())); + } + + + @Test + public void shouldEnforceCopartitioningOnRepartitionTopics() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent); + final ConfiguredInternalTopic config = createTopicConfig("repartitioned", 10); + + validator.enforce(Utils.mkSet("first", "second", config.name()), + Collections.singletonMap(config.name(), config)); + + assertEquals(Optional.of(2), config.numberOfPartitions()); + } + + + @Test + public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider); + final ConfiguredInternalTopic one = createTopicConfig("one", 1); + final ConfiguredInternalTopic two = createTopicConfig("two", 15); + final ConfiguredInternalTopic three = createTopicConfig("three", 5); + final Map<String, ConfiguredInternalTopic> internalTopicConfigs = new HashMap<>(); + + internalTopicConfigs.put(one.name(), one); + internalTopicConfigs.put(two.name(), two); + internalTopicConfigs.put(three.name(), three); + + validator.enforce(Utils.mkSet(one.name(), + two.name(), + three.name()), Review Comment: nit for readability: ```suggestion validator.enforce(Utils.mkSet( one.name(), two.name(), three.name() ), ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class is responsible for enforcing the number of partitions in copartitioned topics. + */ +public class CopartitionedTopicsEnforcer { + + private final Logger log; + private final Function<String, Integer> topicPartitionCountProvider; + + public CopartitionedTopicsEnforcer(final LogContext logContext, + final Function<String, Integer> topicPartitionCountProvider) { + this.log = logContext.logger(getClass()); + this.topicPartitionCountProvider = topicPartitionCountProvider; + } + + private static void maybeSetNumberOfPartitionsForInternalTopic(final int numPartitionsToUseForRepartitionTopics, + final ConfiguredInternalTopic config) { + if (!config.hasEnforcedNumberOfPartitions()) { + config.setNumberOfPartitions(numPartitionsToUseForRepartitionTopics); + } + } + + private static Supplier<StreamsInvalidTopologyException> emptyNumberOfPartitionsExceptionSupplier(final String topic) { + return () -> new StreamsInvalidTopologyException("Number of partitions is not set for topic: " + topic); + } + + /** + * Enforces the number of partitions for copartitioned topics. + * + * @param copartitionedTopics the set of copartitioned topics + * @param repartitionTopics a map from repartition topics to their internal topic configs + */ + public void enforce(final Set<String> copartitionedTopics, + final Map<String, ConfiguredInternalTopic> repartitionTopics) { + if (copartitionedTopics.isEmpty()) { + return; + } + + final Map<Object, ConfiguredInternalTopic> repartitionTopicConfigs = + copartitionedTopics.stream() + .filter(repartitionTopics::containsKey) + .collect( + Collectors.toMap(topic -> topic, repartitionTopics::get)); Review Comment: nit: ```suggestion .collect(Collectors.toMap(topic -> topic, repartitionTopics::get)); ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsInconsistentInternalTopicsException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SuppressWarnings("OptionalGetWithoutIsPresent") +public class CopartitionedTopicsEnforcerTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + + private static Integer emptyTopicPartitionProvider(String topic) { + return null; + } + + private static Integer firstSecondTopicConsistent(String topic) { + if (topic.equals("first") || topic.equals("second")) return 2; + return null; + } + + private static Integer firstSecondTopicInconsistent(String topic) { + if (topic.equals("first")) return 2; + if (topic.equals("second")) return 1; + return null; + } + + @Test + public void shouldThrowStreamsInconsistentInternalTopicsExceptionIfNoPartitionsFoundForCoPartitionedTopic() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider); + assertThrows(StreamsInvalidTopologyException.class, () -> validator.enforce(Collections.singleton("topic"), + Collections.emptyMap())); + } + + @Test + public void shouldThrowStreamsInconsistentInternalTopicsExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, CopartitionedTopicsEnforcerTest::firstSecondTopicInconsistent); + assertThrows(StreamsInconsistentInternalTopicsException.class, () -> validator.enforce(Utils.mkSet("first", "second"), + Collections.emptyMap())); + } + + + @Test + public void shouldEnforceCopartitioningOnRepartitionTopics() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent); + final ConfiguredInternalTopic config = createTopicConfig("repartitioned", 10); + + validator.enforce(Utils.mkSet("first", "second", config.name()), + Collections.singletonMap(config.name(), config)); + + assertEquals(Optional.of(2), config.numberOfPartitions()); + } + + + @Test + public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, CopartitionedTopicsEnforcerTest::emptyTopicPartitionProvider); + final ConfiguredInternalTopic one = createTopicConfig("one", 1); + final ConfiguredInternalTopic two = createTopicConfig("two", 15); + final ConfiguredInternalTopic three = createTopicConfig("three", 5); + final Map<String, ConfiguredInternalTopic> internalTopicConfigs = new HashMap<>(); + + internalTopicConfigs.put(one.name(), one); + internalTopicConfigs.put(two.name(), two); + internalTopicConfigs.put(three.name(), three); + + validator.enforce(Utils.mkSet(one.name(), + two.name(), + three.name()), + internalTopicConfigs + ); + + assertEquals(Optional.of(15), one.numberOfPartitions()); + assertEquals(Optional.of(15), two.numberOfPartitions()); + assertEquals(Optional.of(15), three.numberOfPartitions()); + } + + @Test + public void shouldThrowAnExceptionIfInternalTopicConfigsWithEnforcedNumOfPartitionsHaveDifferentNumOfPartitions() { + final CopartitionedTopicsEnforcer validator = new CopartitionedTopicsEnforcer(LOG_CONTEXT, CopartitionedTopicsEnforcerTest::firstSecondTopicConsistent); + final ConfiguredInternalTopic topic1 = createInternalTopicConfigWithEnforcedNumberOfPartitions("repartitioned-1", 10); + final ConfiguredInternalTopic topic2 = createInternalTopicConfigWithEnforcedNumberOfPartitions("repartitioned-2", 5); + + final StreamsInconsistentInternalTopicsException ex = assertThrows( + StreamsInconsistentInternalTopicsException.class, + () -> validator.enforce(Utils.mkSet(topic1.name(), topic2.name()), + Utils.mkMap(Utils.mkEntry(topic1.name(), topic1), + Utils.mkEntry(topic2.name(), topic2))) Review Comment: nit for readability: ```suggestion Utils.mkMap( Utils.mkEntry(topic1.name(), topic1), Utils.mkEntry(topic2.name(), topic2) ) ) ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.internals.Topic; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * ConfiguredInternalTopic captures the properties required for configuring + * the internal topics we create for change-logs and repartitioning etc. + * + * It is derived from the topology sent by the client, and the current state + * of the topics inside the broker. If the topics on the broker changes, the + * internal topic may need to be reconfigured. + */ +public class ConfiguredInternalTopic { + + private final String name; + private final Map<String, String> topicConfigs; + private final Optional<Short> replicationFactor; + private final boolean enforceNumberOfPartitions; + private Optional<Integer> numberOfPartitions; + + public ConfiguredInternalTopic(final String name) { + this(name, Collections.emptyMap(), Optional.empty(), Optional.empty()); + } + + public ConfiguredInternalTopic(final String name, + final Map<String, String> topicConfigs) { + this(name, topicConfigs, Optional.empty(), Optional.empty()); + } + + public ConfiguredInternalTopic( + final String name, + final Map<String, String> topicConfigs, + final Optional<Integer> numberOfPartitions, + final Optional<Short> replicationFactor) { Review Comment: Also here and elsewhere: ```suggestion public ConfiguredInternalTopic(final String name, final Map<String, String> topicConfigs, final Optional<Integer> numberOfPartitions, final Optional<Short> replicationFactor) { ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class InternalTopicManager { + + public static Map<String, CreatableTopic> missingTopics(Map<String, ConfiguredSubtopology> subtopologyMap, + MetadataImage metadataImage) { + + final Map<String, CreatableTopic> topicsToCreate = new HashMap<>(); + for (ConfiguredSubtopology subtopology : subtopologyMap.values()) { + subtopology.repartitionSourceTopics().values() + .forEach(x -> topicsToCreate.put(x.name(), toCreatableTopic(x))); + subtopology.stateChangelogTopics().values() + .forEach(x -> topicsToCreate.put(x.name(), toCreatableTopic(x))); + } + // TODO: Validate if existing topics are compatible with the new topics + for (String topic : metadataImage.topics().topicsByName().keySet()) { + topicsToCreate.remove(topic); + } + return topicsToCreate; + } + + + public static Map<String, ConfiguredSubtopology> configureTopics(LogContext logContext, + List<StreamsGroupTopologyValue.Subtopology> subtopologyList, + MetadataImage metadataImage) { + + final Logger log = logContext.logger(InternalTopicManager.class); + + final Map<String, ConfiguredSubtopology> configuredSubtopologies = + subtopologyList.stream() + .collect(Collectors.toMap( + StreamsGroupTopologyValue.Subtopology::subtopologyId, + InternalTopicManager::fromPersistedSubtopology) + ); + + final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology = + subtopologyList.stream() + .collect(Collectors.toMap( + StreamsGroupTopologyValue.Subtopology::subtopologyId, + InternalTopicManager::copartitionGroupsFromPersistedSubtopology) + ); + + final Map<String, ConfiguredInternalTopic> configuredInternalTopics = + configuredSubtopologies.values().stream().flatMap(x -> + Stream.concat( + x.repartitionSourceTopics().values().stream(), + x.stateChangelogTopics().values().stream() + ) + ).collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x)); + + final Function<String, Integer> topicPartitionCountProvider = + x -> getPartitionCount(metadataImage, x, configuredInternalTopics); Review Comment: Could you please use `topic` instead of `x` to make the code better readable? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.HashMap; +import java.util.Map; +import java.util.OptionalInt; +import java.util.function.Function; + +/** + * This class is responsible for setting up the changelog topics for a topology. + */ +public class ChangelogTopics { + + private final Map<String, ConfiguredSubtopology> subtopologies; + private final Function<String, Integer> topicPartitionCountProvider; + private final Logger log; + + public ChangelogTopics( + final LogContext logContext, + final Map<String, ConfiguredSubtopology> subtopologies, + final Function<String, Integer> topicPartitionCountProvider) { + this.log = logContext.logger(getClass()); + this.subtopologies = subtopologies; + this.topicPartitionCountProvider = topicPartitionCountProvider; + } Review Comment: Could you please change the formatting to the following? It it really hard to read if there is not enough visual separation between code and parameters. ```suggestion public ChangelogTopics(final LogContext logContext, final Map<String, ConfiguredSubtopology> subtopologies, final Function<String, Integer> topicPartitionCountProvider) { this.log = logContext.logger(getClass()); this.subtopologies = subtopologies; this.topicPartitionCountProvider = topicPartitionCountProvider; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
