ableegoldman commented on code in PR #16033:
URL: https://github.com/apache/kafka/pull/16033#discussion_r1618035204
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -16,14 +16,63 @@
*/
package org.apache.kafka.streams.processor.assignment;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.SortedSet;
+import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.kafka.streams.processor.TaskId;
+import
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.internals.assignment.Graph;
+import
org.apache.kafka.streams.processor.internals.assignment.MinTrafficGraphConstructor;
+import
org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructor;
+import
org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructorFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A set of utilities to help implement task assignment via the {@link
TaskAssignor}
*/
public final class TaskAssignmentUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskAssignmentUtils.class);
+
+ private TaskAssignmentUtils() {}
+
+ /**
+ * Return a "no-op" assignment that just copies the previous assignment of
tasks to KafkaStreams clients
+ *
+ * @param applicationState the metadata and other info describing the
current application state
+ *
+ * @return a new map containing an assignment that replicates exactly the
previous assignment reported
+ * in the applicationState
+ */
+ public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(
+ final ApplicationState applicationState
+ ) {
+ final Map<ProcessId, KafkaStreamsAssignment> assignments = new
HashMap<>();
+ applicationState.kafkaStreamsStates(false).forEach((processId, state)
-> {
+ final Set<AssignedTask> tasks = new HashSet<>();
+ state.previousActiveTasks().forEach(taskId -> {
+ tasks.add(new AssignedTask(taskId,
+ AssignedTask.Type.ACTIVE));
+ });
+ state.previousStandbyTasks().forEach(taskId -> {
+ tasks.add(new AssignedTask(taskId,
+ AssignedTask.Type.STANDBY));
+ });
+
+ final KafkaStreamsAssignment newAssignment =
KafkaStreamsAssignment.of(processId, tasks);
+ assignments.put(processId, newAssignment);
+ });
+ return assignments;
+ }
+
/**
* Assign standby tasks to KafkaStreams clients according to the default
logic.
Review Comment:
Let's also just go ahead and remove this utility method for now while we're
at it, so we don't have to remember to remove it later if we don't implement it
before the 3.8 code freeze
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]