cadonna commented on code in PR #18268:
URL: https://github.com/apache/kafka/pull/18268#discussion_r1903887153


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * ConfiguredInternalTopic captures the properties required for configuring 
the internal topics we create for change-logs and repartitioning
+ * etc.
+ * <p>
+ * It is derived from the topology sent by the client, and the current state 
of the topics inside the broker. If the topics on the broker
+ * changes, the internal topic may need to be reconfigured.
+ */
+public record ConfiguredInternalTopic(String name,
+                                      Map<String, String> topicConfigs,
+                                      int numberOfPartitions,
+                                      Optional<Short> replicationFactor) {

Review Comment:
   nit:
   IMO the following order of parameters would be better:
   ```java
   public record ConfiguredInternalTopic(String name,
                                         int numberOfPartitions,
                                         Optional<Short> replicationFactor,
                                         Map<String, String> topicConfigs) {
   ```
   Basically, first the mandatory parameters, then an optional  specific 
configs, and then the rest.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Internal representation of a subtopology.
+ * <p>
+ * The subtopology is configured according to the number of partitions 
available in the source topics. It has regular expressions already
+ * resolved and defined exactly the information that is being used by streams 
groups assignment reconciliation.
+ * <p>
+ * Configured subtopologies may be recreated every time the input topics used 
by the subtopology are modified.
+ */
+public record ConfiguredSubtopology(Set<String> repartitionSinkTopics,
+                                    Set<String> sourceTopics,
+                                    Map<String, ConfiguredInternalTopic> 
repartitionSourceTopics,
+                                    Map<String, ConfiguredInternalTopic> 
stateChangelogTopics) {

Review Comment:
   nit:
   Also here I would prefer a different order. First source, then sink, finally 
changelog.
   ```suggestion
   public record ConfiguredSubtopology(Set<String> sourceTopics,
                                       Map<String, ConfiguredInternalTopic> 
repartitionSourceTopics,
                                       Set<String> repartitionSinkTopics,
                                       Map<String, ConfiguredInternalTopic> 
stateChangelogTopics) {
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfiguredTopologyTest {
+
+    @Test
+    public void testIsReady() {
+        ConfiguredTopology readyTopology = new ConfiguredTopology(
+            1, new HashMap<>(), new HashMap<>(), Optional.empty());
+        assertTrue(readyTopology.isReady());
+
+        ConfiguredTopology notReadyTopology = new ConfiguredTopology(
+            1, new HashMap<>(), new HashMap<>(), 
Optional.of(TopicConfigurationException.missingSourceTopics("missing")));
+        assertFalse(notReadyTopology.isReady());
+    }
+
+    @Test
+    public void testAsStreamsGroupDescribeTopology() {
+        int topologyEpoch = 1;

Review Comment:
   nit:
   See my comment above about structure.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * ConfiguredInternalTopic captures the properties required for configuring 
the internal topics we create for change-logs and repartitioning
+ * etc.
+ * <p>
+ * It is derived from the topology sent by the client, and the current state 
of the topics inside the broker. If the topics on the broker
+ * changes, the internal topic may need to be reconfigured.
+ */

Review Comment:
   Should we try to make the javadocs a bit more consistent?
   
   For example:
   ```suggestion
    * Captures the properties required for configuring the internal topics we 
create for changelogs and repartitioning etc.
    * <p>
    * It is derived from the topology sent by the client, and the current state 
of the topics inside the broker. If the topics on the broker
    * changes, the internal topic may need to be reconfigured.
    */
   ```
   To make it more similar to the javadocs of `ConfiguredSubtopology`. 
   
   Also, adding the docs for the parameters as you did in `ConfiguredTopology` 
makes sense.
   
   A good class documentation is beneficial.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfiguredSubtopologyTest {
+
+    @Test
+    public void testAsStreamsGroupDescribeSubtopology() {
+        String subtopologyId = "subtopology1";
+
+        Set<String> sourceTopics = new HashSet<>(Set.of("sourceTopic1", 
"sourceTopic2"));
+        Set<String> repartitionSinkTopics = new 
HashSet<>(Set.of("repartitionSinkTopic1", "repartitionSinkTopic2"));
+
+        ConfiguredInternalTopic internalTopicMock = 
mock(ConfiguredInternalTopic.class);
+        StreamsGroupDescribeResponseData.TopicInfo topicInfo = new 
StreamsGroupDescribeResponseData.TopicInfo();
+        
when(internalTopicMock.asStreamsGroupDescribeTopicInfo()).thenReturn(topicInfo);
+
+        Map<String, ConfiguredInternalTopic> repartitionSourceTopics = 
Map.of("repartitionSourceTopic1", internalTopicMock);
+        Map<String, ConfiguredInternalTopic> stateChangelogTopics = 
Map.of("stateChangelogTopic1", internalTopicMock);
+
+        ConfiguredSubtopology configuredSubtopology = new 
ConfiguredSubtopology(
+            repartitionSinkTopics, sourceTopics, repartitionSourceTopics, 
stateChangelogTopics);

Review Comment:
   nit:
   IMO, test code is more easily readable, if it has three blocks: first setup, 
then call under test, and finally verification. There are exception to this 
structure, but I believe this test would benefit from it. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * ConfiguredInternalTopic captures the properties required for configuring 
the internal topics we create for change-logs and repartitioning

Review Comment:
   ```suggestion
    * ConfiguredInternalTopic captures the properties required for configuring 
the internal topics we create for changelogs and repartitioning
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to