bbejeck commented on code in PR #19722:
URL: https://github.com/apache/kafka/pull/19722#discussion_r2125005760


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java:
##########
@@ -113,14 +103,4 @@ void 
testIsStatefulThrowsExceptionWhenSubtopologyIdDoesNotExist() {
     void 
testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyIdDoesNotExist() {
         assertThrows(NoSuchElementException.class, () -> 
topologyMetadata.maxNumInputPartitions("non_existent_subtopology"));
     }
-
-    @Test

Review Comment:
   same here



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java:
##########
@@ -30,17 +30,23 @@
  * <p>
  * Configured subtopologies may be recreated every time the input topics used 
by the subtopology are modified.
  *
+ * @param numberOfTasks           Precomputed number of tasks. Note that not 
every source topic may have a partition for
+ *                                every task.

Review Comment:
   I'm assuming this is for the cases of topic A has 3 partitions and topic B 
has 5 - maybe update this to add something like `in cases where there are 
multiple source topics with an unequal number of partitions` 



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java:
##########
@@ -77,18 +76,9 @@ void testIsStateful() {
 
     @Test
     void testMaxNumInputPartitions() {
-        ConfiguredInternalTopic internalTopic = 
mock(ConfiguredInternalTopic.class);
         ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
         subtopologyMap.put("subtopology1", subtopology);
-        when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic"));
-        
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic",
 internalTopic));

Review Comment:
   Why remove this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1677,6 +1679,34 @@ private static void 
throwIfStreamsGroupMemberEpochIsInvalid(
         }
     }
 
+    /**
+     * Validates that the requested tasks exist in the configured topology and 
partitions are valid.
+     * If tasks is null, does nothing. If an invalid task is found, throws 
InvalidRequestException.
+     *
+     * @param topology The configured topology.

Review Comment:
   javadoc needs to be updated, `topology`-> `subtopologySortedMap` 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1677,6 +1679,34 @@ private static void 
throwIfStreamsGroupMemberEpochIsInvalid(
         }
     }
 
+    /**
+     * Validates that the requested tasks exist in the configured topology and 
partitions are valid.
+     * If tasks is null, does nothing. If an invalid task is found, throws 
InvalidRequestException.
+     *
+     * @param topology The configured topology.
+     * @param tasks    The list of requested tasks.
+     */
+    private static void throwIfRequestContainsInvalidTasks(
+        SortedMap<String, ConfiguredSubtopology> subtopologySortedMap,
+        List<StreamsGroupHeartbeatRequestData.TaskIds> tasks
+    ) {
+        if (tasks == null) return;
+        for (StreamsGroupHeartbeatRequestData.TaskIds task : tasks) {
+            String subtopologyId = task.subtopologyId();
+            if (!subtopologySortedMap.containsKey(subtopologyId)) {
+                throw new InvalidRequestException("Subtopology " + 
subtopologyId + " does not exist in the topology.");
+            }
+            ConfiguredSubtopology subtopology = 
subtopologySortedMap.get(subtopologyId);
+            int numTasks = subtopology.numberOfTasks();
+            for (Integer partition : task.partitions()) {

Review Comment:
   nit/optional - do we need to check for empty partitions? 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1677,6 +1679,34 @@ private static void 
throwIfStreamsGroupMemberEpochIsInvalid(
         }
     }
 
+    /**
+     * Validates that the requested tasks exist in the configured topology and 
partitions are valid.
+     * If tasks is null, does nothing. If an invalid task is found, throws 
InvalidRequestException.
+     *
+     * @param topology The configured topology.
+     * @param tasks    The list of requested tasks.
+     */
+    private static void throwIfRequestContainsInvalidTasks(
+        SortedMap<String, ConfiguredSubtopology> subtopologySortedMap,
+        List<StreamsGroupHeartbeatRequestData.TaskIds> tasks
+    ) {
+        if (tasks == null) return;
+        for (StreamsGroupHeartbeatRequestData.TaskIds task : tasks) {
+            String subtopologyId = task.subtopologyId();
+            if (!subtopologySortedMap.containsKey(subtopologyId)) {
+                throw new InvalidRequestException("Subtopology " + 
subtopologyId + " does not exist in the topology.");
+            }
+            ConfiguredSubtopology subtopology = 
subtopologySortedMap.get(subtopologyId);

Review Comment:
   nit: move this up and change the check to `if (subtopology == null) ..` 
minor point, so I'll leave it up to you.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1677,6 +1679,34 @@ private static void 
throwIfStreamsGroupMemberEpochIsInvalid(
         }
     }
 
+    /**
+     * Validates that the requested tasks exist in the configured topology and 
partitions are valid.
+     * If tasks is null, does nothing. If an invalid task is found, throws 
InvalidRequestException.
+     *
+     * @param topology The configured topology.
+     * @param tasks    The list of requested tasks.
+     */
+    private static void throwIfRequestContainsInvalidTasks(
+        SortedMap<String, ConfiguredSubtopology> subtopologySortedMap,
+        List<StreamsGroupHeartbeatRequestData.TaskIds> tasks
+    ) {
+        if (tasks == null) return;

Review Comment:
   maybe expand to include `tasks.isEmpty()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to