wcarlson5 commented on a change in pull request #11712:
URL: https://github.com/apache/kafka/pull/11712#discussion_r797772309



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -270,6 +278,23 @@ Task task(final TaskId taskId) {
         return readOnlyActiveTasks;
     }
 
+    List<Task> orderedActiveTasks() {
+        return Collections.unmodifiableList(orderedActiveTasks);
+    }
+
+    void moveActiveTasksToTailFor(final String topologyName) {
+        final List<Task> tasksToMove = new LinkedList<>();
+        final Iterator<Task> iterator = orderedActiveTasks.iterator();
+        while (iterator.hasNext()) {
+            final Task task = iterator.next();
+            if (task.id().topologyName().equals(topologyName)) {
+                iterator.remove();
+                tasksToMove.add(task);
+            }
+        }
+        orderedActiveTasks.addAll(tasksToMove);

Review comment:
       It would be much simpler but unfortunately its not as simple as we first 
thought. The producer has only one transaction, so the records of the good 
tasks are mixed in with the records of the failed task and there is no way to 
separate them. So we need to take the tasks that we know will fail and process 
all the other tasks without them. That way we continue making progress. We can 
take the failing tasks and backoff and retry again later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to