mjsax commented on code in PR #17761: URL: https://github.com/apache/kafka/pull/17761#discussion_r1927659866
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId, CompletableFuture<StateUpdater. final Set<Task> tasksToCloseCleanFromStateUpdater, final Set<Task> tasksToCloseDirtyFromStateUpdater) { futures.entrySet().stream() - .map(entry -> waitForFuture(entry.getKey(), entry.getValue())) - .filter(Objects::nonNull) - .forEach(removedTaskResult -> { - if (removedTaskResult.exception().isPresent()) { - tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task()); - } else { - tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task()); - } - }); + .map(entry -> waitForFuture(entry.getKey(), entry.getValue())) + .filter(Objects::nonNull) + .forEach(removedTaskResult -> { + if (removedTaskResult.exception().isPresent()) { + final RuntimeException runtimeException = removedTaskResult.exception().get(); + if (runtimeException instanceof TaskCorruptedException) { Review Comment: > InterruptedException exceptions are also possible, and I recommend wrapping this exception in a StreamsException too. Might be good to get input from @cadonna about this question? About `IllegalStateException` I also think we should close dirty. In the end, it should not happen to begin with, but would indicate a bug. If we lose the standby state when we hit a bug, I think it's acceptable. Another thought: When we introduced internal handling of `TimeoutExceptions` from the client, we actually did a very detailed analysis what exception could be through for what reason and how to handle it (cf https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture#KafkaStreamsArchitecture-ExceptionHandling -- it's of course somewhat outdated). Wondering to what extend we should do a similar analysis for the state-updater code, to reason about what exception we might get, and how to group them reasonable? -- I agree that whitelisting individual exceptions in this part of the code would not be ideal. I think it would be much better to introduce some "CloseCleanStreamsException" that we check here, and modify state-updated code to make a judgment, to wrap or not warp as "CloseCleanStreamsException", where the exception happens? It's much easier to reason about "close clean" vs "close dirty" throughout the code, than in a single centralized place in which we have n o context information about the root cause of the error. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org