eduwercamacaro commented on code in PR #17761: URL: https://github.com/apache/kafka/pull/17761#discussion_r1939718078
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId, CompletableFuture<StateUpdater. final Set<Task> tasksToCloseCleanFromStateUpdater, final Set<Task> tasksToCloseDirtyFromStateUpdater) { futures.entrySet().stream() - .map(entry -> waitForFuture(entry.getKey(), entry.getValue())) - .filter(Objects::nonNull) - .forEach(removedTaskResult -> { - if (removedTaskResult.exception().isPresent()) { - tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task()); - } else { - tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task()); - } - }); + .map(entry -> waitForFuture(entry.getKey(), entry.getValue())) + .filter(Objects::nonNull) + .forEach(removedTaskResult -> { + if (removedTaskResult.exception().isPresent()) { + final RuntimeException runtimeException = removedTaskResult.exception().get(); + if (runtimeException instanceof TaskCorruptedException) { Review Comment: @cadonna Thanks for these inputs. I believe we don't need to implement this exception handling in the `TaskManager.handleExceptionsFromStateUpdater` method because it is already being handled at StreamThread level. StateUpdater's exceptions get propagated to the StreamThread's exception handling and then decide whether to close the task clean or dirty. That's what I understand from the code, but maybe I'm wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org