ableegoldman commented on code in PR #16033:
URL: https://github.com/apache/kafka/pull/16033#discussion_r1618035204


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -16,14 +16,63 @@
  */
 package org.apache.kafka.streams.processor.assignment;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.SortedSet;
+import java.util.UUID;
+import java.util.stream.Collectors;
 import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.internals.assignment.Graph;
+import 
org.apache.kafka.streams.processor.internals.assignment.MinTrafficGraphConstructor;
+import 
org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructor;
+import 
org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructorFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A set of utilities to help implement task assignment via the {@link 
TaskAssignor}
  */
 public final class TaskAssignmentUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TaskAssignmentUtils.class);
+
+    private TaskAssignmentUtils() {}
+
+    /**
+     * Return a "no-op" assignment that just copies the previous assignment of 
tasks to KafkaStreams clients
+     *
+     * @param applicationState the metadata and other info describing the 
current application state
+     *
+     * @return a new map containing an assignment that replicates exactly the 
previous assignment reported
+     *         in the applicationState
+     */
+    public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(
+        final ApplicationState applicationState
+    ) {
+        final Map<ProcessId, KafkaStreamsAssignment> assignments = new 
HashMap<>();
+        applicationState.kafkaStreamsStates(false).forEach((processId, state) 
-> {
+            final Set<AssignedTask> tasks = new HashSet<>();
+            state.previousActiveTasks().forEach(taskId -> {
+                tasks.add(new AssignedTask(taskId,
+                    AssignedTask.Type.ACTIVE));
+            });
+            state.previousStandbyTasks().forEach(taskId -> {
+                tasks.add(new AssignedTask(taskId,
+                    AssignedTask.Type.STANDBY));
+            });
+
+            final KafkaStreamsAssignment newAssignment = 
KafkaStreamsAssignment.of(processId, tasks);
+            assignments.put(processId, newAssignment);
+        });
+        return assignments;
+    }
+
     /**
      * Assign standby tasks to KafkaStreams clients according to the default 
logic.

Review Comment:
   Let's also just go ahead and remove this utility method for now while we're 
at it, so we don't have to remember to remove it later if we don't implement it 
before the 3.8 code freeze



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to