ableegoldman commented on code in PR #16114:
URL: https://github.com/apache/kafka/pull/16114#discussion_r1617974814


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long 
encodedNextScheduledRebal
         }
     }
 
+    private AssignmentError validateTaskAssignment(final ApplicationState 
applicationState,
+                                                   final TaskAssignment 
taskAssignment) {
+        final Collection<KafkaStreamsAssignment> assignments = 
taskAssignment.assignment();
+
+        // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+        final Set<TaskId> activeTasks = new HashSet<>();
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() != 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+                    continue;
+                }

Review Comment:
   as an aside, please limit the use of `continue` in loops, it makes the code 
flow harder to follow vs just a single direction of logic within the 
conditional as in 
   ```
   if (task.type == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
   // do stuff
   }
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long 
encodedNextScheduledRebal
         }
     }
 
+    private AssignmentError validateTaskAssignment(final ApplicationState 
applicationState,
+                                                   final TaskAssignment 
taskAssignment) {
+        final Collection<KafkaStreamsAssignment> assignments = 
taskAssignment.assignment();
+
+        // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+        final Set<TaskId> activeTasks = new HashSet<>();
+        for (final KafkaStreamsAssignment assignment : assignments) {

Review Comment:
   (if you're wondering how the 
`ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS` check works in that 
loop, it's because as long as there is a version of that taskId already 
assigned to that client, then we know it must be an active and standby task 
assigned at the same time. Since an AssignedTask with the same taskId and type 
is considered equivalent, and the assignment uses a Set, so there can only be 
one copy of a given taskId-taskType combination in each 
KafkaStreamsAssignment#assignment)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long 
encodedNextScheduledRebal
         }
     }
 
+    private AssignmentError validateTaskAssignment(final ApplicationState 
applicationState,
+                                                   final TaskAssignment 
taskAssignment) {
+        final Collection<KafkaStreamsAssignment> assignments = 
taskAssignment.assignment();
+
+        // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+        final Set<TaskId> activeTasks = new HashSet<>();
+        for (final KafkaStreamsAssignment assignment : assignments) {

Review Comment:
   I know I originally suggested to have each of these as separate checks, so 
that's on me, but if we're looping over the same thing in the same way over and 
over then we may as well combine the checks. I also think we can make this a 
bit easier to follow by doing so (imo at least). For example we can combine all 
of these into a single nested loop with something like this:
   
   ```
   final Set<TaskId> activeTasks = new HashSet<>();
   final Set<TaskId> standbyTasks = new HashSet<>();
   for (final KafkaStreamsAssignment assignment : assignments) {
       final Map<TaskId, AssignedTask.Type> tasksForAssignment = new 
HashMap<>();
       for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
   
            if (activeTasks.contains(task.id())) {
                return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
            }
   
            if (tasksForAssignment.containsKey(task.id())) {
                 return 
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
            }
   
            tasksForAssignment.put(task.id(), task.type());
            if (task.type() == AssignedTask.Type.ACTIVE) {
                activeTasks.add(task.id());
             } else {
                standbyTasks.add(task.id());
             }
        }
   }
   
   for (final TaskInfo task : applicationState.allTasks()) {
       if (!task.isStateful() && standbyTasks.contains(task.id())) {
           return AssignmentError.INVALID_STANDBY_TASK;
       }
   }
   ```
   
   Maybe it's just me but that feels much more to the point. We can even 
tighten up that last loop by having the `ApplicationState#allTasks` return a 
`Map<TaskId, TaskInfo` and then only loop over the `standbyTasks`. It's not 
really worth worrying about since not many people will have a huge number of 
stateless tasks so the wasted time looping over them is relatively small, but I 
was thinking that it might make sense to change the  
`ApplicationState#allTasks` API to return a map anyways so that it's easy to 
look up info for a specific task (which might be useful for a custom assignor). 
Thoughts?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long 
encodedNextScheduledRebal
         }
     }
 
+    private AssignmentError validateTaskAssignment(final ApplicationState 
applicationState,
+                                                   final TaskAssignment 
taskAssignment) {
+        final Collection<KafkaStreamsAssignment> assignments = 
taskAssignment.assignment();
+
+        // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+        final Set<TaskId> activeTasks = new HashSet<>();
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() != 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+                    continue;
+                }
+
+                if (activeTasks.contains(task.id())) {
+                    return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;

Review Comment:
   we should log an error message any time we find an error before returning 
from this method, so we can include the offending taskId/processId/etc in the 
log message while we still have that info in scope. 
   
   so ditto for everywhere else we return an error code



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long 
encodedNextScheduledRebal
         }
     }
 
+    private AssignmentError validateTaskAssignment(final ApplicationState 
applicationState,
+                                                   final TaskAssignment 
taskAssignment) {
+        final Collection<KafkaStreamsAssignment> assignments = 
taskAssignment.assignment();
+
+        // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+        final Set<TaskId> activeTasks = new HashSet<>();
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() != 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+                    continue;
+                }
+
+                if (activeTasks.contains(task.id())) {
+                    return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+                }
+                activeTasks.add(task.id());
+            }
+        }
+
+        // Check for ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            final Set<TaskId> activeTasksForAssignment = new HashSet<>();
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() == 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+                    activeTasksForAssignment.add(task.id());
+                }
+            }
+
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() == 
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+                    if (activeTasksForAssignment.contains(task.id())) {
+                        return 
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
+                    }
+                }
+            }
+        }
+
+        // Check for INVALID_STANDBY_TASK
+        final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() == 
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+                    standbyTasksInOutput.add(task.id());
+                }
+            }
+        }
+        for (final TaskInfo task : applicationState.allTasks()) {
+            if (!task.isStateful() && 
standbyTasksInOutput.contains(task.id())) {
+                return AssignmentError.INVALID_STANDBY_TASK;
+            }
+        }
+
+        // Check for MISSING_PROCESS_ID
+        final Map<ProcessId, KafkaStreamsState> clientStates = 
applicationState.kafkaStreamsStates(false);
+        final Set<ProcessId> clientsInOutput = 
assignments.stream().map(KafkaStreamsAssignment::processId)
+            .collect(Collectors.toSet());
+        for (final Map.Entry<ProcessId, KafkaStreamsState> entry : 
clientStates.entrySet()) {
+            final ProcessId processIdInInput = entry.getKey();
+            if (!clientsInOutput.contains(processIdInInput)) {
+                return AssignmentError.MISSING_PROCESS_ID;
+            }
+        }
+
+        // Check for UNKNOWN_PROCESS_ID
+        final Set<ProcessId> clientsInInput = 
clientStates.entrySet().stream().map(Map.Entry::getKey)

Review Comment:
   fyi java maps have a `keySet` and `valueSet` method that lets you extract 
just part of the map. So this could be simplified to just 
`clientState.keySet()` and there's no need to build up a new map.
   
   That said, we can just directly check `if 
(clientState.containsKey(processIdInOutput)` so there's really no need for the 
`clientsInInput` variable to be introduced at all. Or better yet, since we only 
need the key/processId to begin with, combine both of the above suggestions and 
just declare the `clientStates` right off the bat as follows to really tighten 
things up:
   
   ```
   final Set<ProcessId> clientStates = 
applicationState.kafkaStreamsStates(false).keySet();
   ```
   
   (Just trying to keep the number of new fields introduced to a minimum since 
it's complicated enough already even with just the preexisting data structures)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long 
encodedNextScheduledRebal
         }
     }
 
+    private AssignmentError validateTaskAssignment(final ApplicationState 
applicationState,
+                                                   final TaskAssignment 
taskAssignment) {
+        final Collection<KafkaStreamsAssignment> assignments = 
taskAssignment.assignment();
+
+        // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+        final Set<TaskId> activeTasks = new HashSet<>();
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() != 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+                    continue;
+                }
+
+                if (activeTasks.contains(task.id())) {
+                    return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+                }
+                activeTasks.add(task.id());
+            }
+        }
+
+        // Check for ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            final Set<TaskId> activeTasksForAssignment = new HashSet<>();
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() == 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+                    activeTasksForAssignment.add(task.id());
+                }
+            }
+
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() == 
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+                    if (activeTasksForAssignment.contains(task.id())) {
+                        return 
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
+                    }
+                }
+            }
+        }
+
+        // Check for INVALID_STANDBY_TASK
+        final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() == 
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+                    standbyTasksInOutput.add(task.id());
+                }
+            }
+        }
+        for (final TaskInfo task : applicationState.allTasks()) {
+            if (!task.isStateful() && 
standbyTasksInOutput.contains(task.id())) {
+                return AssignmentError.INVALID_STANDBY_TASK;
+            }
+        }
+
+        // Check for MISSING_PROCESS_ID
+        final Map<ProcessId, KafkaStreamsState> clientStates = 
applicationState.kafkaStreamsStates(false);
+        final Set<ProcessId> clientsInOutput = 
assignments.stream().map(KafkaStreamsAssignment::processId)
+            .collect(Collectors.toSet());
+        for (final Map.Entry<ProcessId, KafkaStreamsState> entry : 
clientStates.entrySet()) {
+            final ProcessId processIdInInput = entry.getKey();
+            if (!clientsInOutput.contains(processIdInInput)) {
+                return AssignmentError.MISSING_PROCESS_ID;
+            }
+        }
+
+        // Check for UNKNOWN_PROCESS_ID
+        final Set<ProcessId> clientsInInput = 
clientStates.entrySet().stream().map(Map.Entry::getKey)
+            .collect(Collectors.toSet());
+        for (final ProcessId processIdInOutput : clientsInOutput) {
+            if (!clientsInInput.contains(processIdInOutput)) {
+                return AssignmentError.UNKNOWN_PROCESS_ID;
+            }
+        }
+
+        // Check for UNKNOWN_TASK_ID
+        final Set<TaskId> taskIdsInInput = 
applicationState.allTasks().stream().map(TaskInfo::id)

Review Comment:
   Ah, changing the `ApplicationState#allTasks` method to return a `Map<TaskId, 
TaskInfo>` map would let us simplify this as well. Let's make that change, I'll 
update the KIP.
   
   Then we can remove this line altogether and move the `if 
(!applicationState.allTasks().containsKey(task.id())) --> return 
AssignmentError.UNKNOWN_TASK_ID;` inside the single loop I outlined in a 
comment above
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long 
encodedNextScheduledRebal
         }
     }
 
+    private AssignmentError validateTaskAssignment(final ApplicationState 
applicationState,
+                                                   final TaskAssignment 
taskAssignment) {
+        final Collection<KafkaStreamsAssignment> assignments = 
taskAssignment.assignment();
+
+        // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+        final Set<TaskId> activeTasks = new HashSet<>();
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() != 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+                    continue;
+                }
+
+                if (activeTasks.contains(task.id())) {
+                    return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+                }
+                activeTasks.add(task.id());
+            }
+        }
+
+        // Check for ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            final Set<TaskId> activeTasksForAssignment = new HashSet<>();
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() == 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+                    activeTasksForAssignment.add(task.id());
+                }
+            }
+
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() == 
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+                    if (activeTasksForAssignment.contains(task.id())) {
+                        return 
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
+                    }
+                }
+            }
+        }
+
+        // Check for INVALID_STANDBY_TASK
+        final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.assignment()) {
+                if (task.type() == 
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+                    standbyTasksInOutput.add(task.id());
+                }
+            }
+        }
+        for (final TaskInfo task : applicationState.allTasks()) {
+            if (!task.isStateful() && 
standbyTasksInOutput.contains(task.id())) {
+                return AssignmentError.INVALID_STANDBY_TASK;
+            }
+        }
+
+        // Check for MISSING_PROCESS_ID

Review Comment:
   nit: no need for the comments, "the code should stand for itself" as one of 
the committers will always say. We typically prefer to limit inline comments to 
things that explain "why" some code is doing something, not "what " some code 
is doing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to