wcarlson5 commented on a change in pull request #11712:
URL: https://github.com/apache/kafka/pull/11712#discussion_r791937041



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -270,6 +278,23 @@ Task task(final TaskId taskId) {
         return readOnlyActiveTasks;
     }
 
+    List<Task> orderedActiveTasks() {
+        return Collections.unmodifiableList(orderedActiveTasks);
+    }
+
+    void moveActiveTasksToTailFor(final String topologyName) {

Review comment:
       I am not sure we want to move all the tasks in a topology? Maybe we can 
do that by task or sub topology?
   
   maybe topology is best but I will need to think about it a bit

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -270,6 +278,23 @@ Task task(final TaskId taskId) {
         return readOnlyActiveTasks;
     }
 
+    List<Task> orderedActiveTasks() {
+        return Collections.unmodifiableList(orderedActiveTasks);
+    }
+
+    void moveActiveTasksToTailFor(final String topologyName) {
+        final List<Task> tasksToMove = new LinkedList<>();
+        final Iterator<Task> iterator = orderedActiveTasks.iterator();
+        while (iterator.hasNext()) {
+            final Task task = iterator.next();
+            if (task.id().topologyName().equals(topologyName)) {
+                iterator.remove();
+                tasksToMove.add(task);
+            }
+        }
+        orderedActiveTasks.addAll(tasksToMove);

Review comment:
       It might be nice to keep the tasks/topologies that have failed in 
another list entirely. Then when reprocessing after an exception we can run all 
the good tasks first and commit them before running the failures. This will be 
important for EOS as we con't commit only part of a transaction. 
   
   The larger part of that doesn't need to be done it this PR but keeping the 
groups separate would be nice in my mind




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to