ableegoldman commented on code in PR #16114:
URL: https://github.com/apache/kafka/pull/16114#discussion_r1617974814
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+
+ // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+ final Set<TaskId> activeTasks = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() !=
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ continue;
+ }
Review Comment:
as an aside, please limit the use of `continue` in loops, it makes the code
flow harder to follow vs just a single direction of logic within the
conditional as in
```
if (task.type == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
// do stuff
}
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+
+ // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+ final Set<TaskId> activeTasks = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
Review Comment:
(if you're wondering how the
`ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS` check works in that
loop, it's because as long as there is a version of that taskId already
assigned to that client, then we know it must be an active and standby task
assigned at the same time. Since an AssignedTask with the same taskId and type
is considered equivalent, and the assignment uses a Set, so there can only be
one copy of a given taskId-taskType combination in each
KafkaStreamsAssignment#assignment)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+
+ // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+ final Set<TaskId> activeTasks = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
Review Comment:
I know I originally suggested to have each of these as separate checks, so
that's on me, but if we're looping over the same thing in the same way over and
over then we may as well combine the checks. I also think we can make this a
bit easier to follow by doing so (imo at least). For example we can combine all
of these into a single nested loop with something like this:
```
final Set<TaskId> activeTasks = new HashSet<>();
final Set<TaskId> standbyTasks = new HashSet<>();
for (final KafkaStreamsAssignment assignment : assignments) {
final Map<TaskId, AssignedTask.Type> tasksForAssignment = new
HashMap<>();
for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
if (activeTasks.contains(task.id())) {
return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
}
if (tasksForAssignment.containsKey(task.id())) {
return
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
}
tasksForAssignment.put(task.id(), task.type());
if (task.type() == AssignedTask.Type.ACTIVE) {
activeTasks.add(task.id());
} else {
standbyTasks.add(task.id());
}
}
}
for (final TaskInfo task : applicationState.allTasks()) {
if (!task.isStateful() && standbyTasks.contains(task.id())) {
return AssignmentError.INVALID_STANDBY_TASK;
}
}
```
Maybe it's just me but that feels much more to the point. We can even
tighten up that last loop by having the `ApplicationState#allTasks` return a
`Map<TaskId, TaskInfo` and then only loop over the `standbyTasks`. It's not
really worth worrying about since not many people will have a huge number of
stateless tasks so the wasted time looping over them is relatively small, but I
was thinking that it might make sense to change the
`ApplicationState#allTasks` API to return a map anyways so that it's easy to
look up info for a specific task (which might be useful for a custom assignor).
Thoughts?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+
+ // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+ final Set<TaskId> activeTasks = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() !=
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ continue;
+ }
+
+ if (activeTasks.contains(task.id())) {
+ return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
Review Comment:
we should log an error message any time we find an error before returning
from this method, so we can include the offending taskId/processId/etc in the
log message while we still have that info in scope.
so ditto for everywhere else we return an error code
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+
+ // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+ final Set<TaskId> activeTasks = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() !=
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ continue;
+ }
+
+ if (activeTasks.contains(task.id())) {
+ return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+ }
+ activeTasks.add(task.id());
+ }
+ }
+
+ // Check for ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ final Set<TaskId> activeTasksForAssignment = new HashSet<>();
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ activeTasksForAssignment.add(task.id());
+ }
+ }
+
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+ if (activeTasksForAssignment.contains(task.id())) {
+ return
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
+ }
+ }
+ }
+ }
+
+ // Check for INVALID_STANDBY_TASK
+ final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+ standbyTasksInOutput.add(task.id());
+ }
+ }
+ }
+ for (final TaskInfo task : applicationState.allTasks()) {
+ if (!task.isStateful() &&
standbyTasksInOutput.contains(task.id())) {
+ return AssignmentError.INVALID_STANDBY_TASK;
+ }
+ }
+
+ // Check for MISSING_PROCESS_ID
+ final Map<ProcessId, KafkaStreamsState> clientStates =
applicationState.kafkaStreamsStates(false);
+ final Set<ProcessId> clientsInOutput =
assignments.stream().map(KafkaStreamsAssignment::processId)
+ .collect(Collectors.toSet());
+ for (final Map.Entry<ProcessId, KafkaStreamsState> entry :
clientStates.entrySet()) {
+ final ProcessId processIdInInput = entry.getKey();
+ if (!clientsInOutput.contains(processIdInInput)) {
+ return AssignmentError.MISSING_PROCESS_ID;
+ }
+ }
+
+ // Check for UNKNOWN_PROCESS_ID
+ final Set<ProcessId> clientsInInput =
clientStates.entrySet().stream().map(Map.Entry::getKey)
Review Comment:
fyi java maps have a `keySet` and `valueSet` method that lets you extract
just part of the map. So this could be simplified to just
`clientState.keySet()` and there's no need to build up a new map.
That said, we can just directly check `if
(clientState.containsKey(processIdInOutput)` so there's really no need for the
`clientsInInput` variable to be introduced at all. Or better yet, since we only
need the key/processId to begin with, combine both of the above suggestions and
just declare the `clientStates` right off the bat as follows to really tighten
things up:
```
final Set<ProcessId> clientStates =
applicationState.kafkaStreamsStates(false).keySet();
```
(Just trying to keep the number of new fields introduced to a minimum since
it's complicated enough already even with just the preexisting data structures)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+
+ // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+ final Set<TaskId> activeTasks = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() !=
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ continue;
+ }
+
+ if (activeTasks.contains(task.id())) {
+ return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+ }
+ activeTasks.add(task.id());
+ }
+ }
+
+ // Check for ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ final Set<TaskId> activeTasksForAssignment = new HashSet<>();
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ activeTasksForAssignment.add(task.id());
+ }
+ }
+
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+ if (activeTasksForAssignment.contains(task.id())) {
+ return
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
+ }
+ }
+ }
+ }
+
+ // Check for INVALID_STANDBY_TASK
+ final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+ standbyTasksInOutput.add(task.id());
+ }
+ }
+ }
+ for (final TaskInfo task : applicationState.allTasks()) {
+ if (!task.isStateful() &&
standbyTasksInOutput.contains(task.id())) {
+ return AssignmentError.INVALID_STANDBY_TASK;
+ }
+ }
+
+ // Check for MISSING_PROCESS_ID
+ final Map<ProcessId, KafkaStreamsState> clientStates =
applicationState.kafkaStreamsStates(false);
+ final Set<ProcessId> clientsInOutput =
assignments.stream().map(KafkaStreamsAssignment::processId)
+ .collect(Collectors.toSet());
+ for (final Map.Entry<ProcessId, KafkaStreamsState> entry :
clientStates.entrySet()) {
+ final ProcessId processIdInInput = entry.getKey();
+ if (!clientsInOutput.contains(processIdInInput)) {
+ return AssignmentError.MISSING_PROCESS_ID;
+ }
+ }
+
+ // Check for UNKNOWN_PROCESS_ID
+ final Set<ProcessId> clientsInInput =
clientStates.entrySet().stream().map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ for (final ProcessId processIdInOutput : clientsInOutput) {
+ if (!clientsInInput.contains(processIdInOutput)) {
+ return AssignmentError.UNKNOWN_PROCESS_ID;
+ }
+ }
+
+ // Check for UNKNOWN_TASK_ID
+ final Set<TaskId> taskIdsInInput =
applicationState.allTasks().stream().map(TaskInfo::id)
Review Comment:
Ah, changing the `ApplicationState#allTasks` method to return a `Map<TaskId,
TaskInfo>` map would let us simplify this as well. Let's make that change, I'll
update the KIP.
Then we can remove this line altogether and move the `if
(!applicationState.allTasks().containsKey(task.id())) --> return
AssignmentError.UNKNOWN_TASK_ID;` inside the single loop I outlined in a
comment above
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+
+ // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES
+ final Set<TaskId> activeTasks = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() !=
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ continue;
+ }
+
+ if (activeTasks.contains(task.id())) {
+ return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+ }
+ activeTasks.add(task.id());
+ }
+ }
+
+ // Check for ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ final Set<TaskId> activeTasksForAssignment = new HashSet<>();
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ activeTasksForAssignment.add(task.id());
+ }
+ }
+
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+ if (activeTasksForAssignment.contains(task.id())) {
+ return
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
+ }
+ }
+ }
+ }
+
+ // Check for INVALID_STANDBY_TASK
+ final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.STANDBY) {
+ standbyTasksInOutput.add(task.id());
+ }
+ }
+ }
+ for (final TaskInfo task : applicationState.allTasks()) {
+ if (!task.isStateful() &&
standbyTasksInOutput.contains(task.id())) {
+ return AssignmentError.INVALID_STANDBY_TASK;
+ }
+ }
+
+ // Check for MISSING_PROCESS_ID
Review Comment:
nit: no need for the comments, "the code should stand for itself" as one of
the committers will always say. We typically prefer to limit inline comments to
things that explain "why" some code is doing something, not "what " some code
is doing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]