ableegoldman commented on code in PR #17761: URL: https://github.com/apache/kafka/pull/17761#discussion_r1927963266
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId, CompletableFuture<StateUpdater. final Set<Task> tasksToCloseCleanFromStateUpdater, final Set<Task> tasksToCloseDirtyFromStateUpdater) { futures.entrySet().stream() - .map(entry -> waitForFuture(entry.getKey(), entry.getValue())) - .filter(Objects::nonNull) - .forEach(removedTaskResult -> { - if (removedTaskResult.exception().isPresent()) { - tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task()); - } else { - tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task()); - } - }); + .map(entry -> waitForFuture(entry.getKey(), entry.getValue())) + .filter(Objects::nonNull) + .forEach(removedTaskResult -> { + if (removedTaskResult.exception().isPresent()) { + final RuntimeException runtimeException = removedTaskResult.exception().get(); + if (runtimeException instanceof TaskCorruptedException) { Review Comment: Agree 100% -- I know "classifying & cleaning up exceptions" has been on our TODO list forever because it's a huge goal, but I think it doesn't hurt to start now, even if it's not perfect and we don't go through and classify/fix everything that exists today. Especially in light of all the huge changes coming (eg KIP-1071) it would be good to just introduce a close-clean/close-dirty exception type and start using it going forward. Also interested to hear from Bruno on this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org