ableegoldman commented on code in PR #16201: URL: https://github.com/apache/kafka/pull/16201#discussion_r1628527914
########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ########## @@ -53,6 +55,76 @@ public final class TaskAssignmentUtils { private TaskAssignmentUtils() {} + /** + * Return an {@code AssignmentError} for a task assignment created for an application. + * + * @param applicationState The application for which this task assignment is being assessed. + * @param taskAssignment The task assignment that will be validated. + * + * @return {@code AssignmentError.NONE} if the assignment created for this application is valid, + * or another {@code AssignmentError} otherwise. + */ + public static AssignmentError validateTaskAssignment(final ApplicationState applicationState, + final TaskAssignment taskAssignment) { + final Collection<KafkaStreamsAssignment> assignments = taskAssignment.assignment(); + final Map<TaskId, ProcessId> activeTasksInOutput = new HashMap<>(); + final Map<TaskId, ProcessId> standbyTasksInOutput = new HashMap<>(); + for (final KafkaStreamsAssignment assignment : assignments) { + for (final KafkaStreamsAssignment.AssignedTask task : assignment.tasks().values()) { + if (activeTasksInOutput.containsKey(task.id()) && task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { + LOG.error("Assignment is invalid: active task {} was assigned to multiple KafkaStreams clients: {} and {}", + task.id(), assignment.processId().id(), activeTasksInOutput.get(task.id()).id()); + return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES; + } + + if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { + activeTasksInOutput.put(task.id(), assignment.processId()); + } else { + standbyTasksInOutput.put(task.id(), assignment.processId()); + } + } + } + + for (final TaskInfo task : applicationState.allTasks().values()) { + if (!task.isStateful() && standbyTasksInOutput.containsKey(task.id())) { + LOG.error("Assignment is invalid: standby task for stateless task {} was assigned to KafkaStreams client {}", + task.id(), standbyTasksInOutput.get(task.id()).id()); + return AssignmentError.INVALID_STANDBY_TASK; + } + } + + final Map<ProcessId, KafkaStreamsState> clientStates = applicationState.kafkaStreamsStates(false); + final Set<ProcessId> clientsInOutput = assignments.stream().map(KafkaStreamsAssignment::processId) + .collect(Collectors.toSet()); + for (final Map.Entry<ProcessId, KafkaStreamsState> entry : clientStates.entrySet()) { + final ProcessId processIdInInput = entry.getKey(); + if (!clientsInOutput.contains(processIdInInput)) { + LOG.error("Assignment is invalid: KafkaStreams client {} has no assignment", processIdInInput.id()); + return AssignmentError.MISSING_PROCESS_ID; + } + } + + for (final ProcessId processIdInOutput : clientsInOutput) { + if (!clientStates.containsKey(processIdInOutput)) { + LOG.error("Assignment is invalid: the KafkaStreams client {} is unknown", processIdInOutput.id()); + return AssignmentError.UNKNOWN_PROCESS_ID; + } + } + + final Set<TaskId> taskIdsInInput = applicationState.allTasks().keySet(); + for (final KafkaStreamsAssignment assignment : assignments) { + for (final KafkaStreamsAssignment.AssignedTask task : assignment.tasks().values()) { + if (!taskIdsInInput.contains(task.id())) { + LOG.error("Assignment is invalid: task {} assigned to KafkaStreams client {} was unknown", + task.id(), assignment.processId().id()); + return AssignmentError.UNKNOWN_TASK_ID; Review Comment: Just noticed we're doing exactly the same loop here as we are at the beginning of this method. Can we just move the contents into that loop? (not a performance thing, I just think it's easier to read if we don't have a bunch of individual loops where I have to re-check the loop conditions each time) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org