ableegoldman commented on code in PR #17761:
URL: https://github.com/apache/kafka/pull/17761#discussion_r1927963266


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId, 
CompletableFuture<StateUpdater.
                                    final Set<Task> 
tasksToCloseCleanFromStateUpdater,
                                    final Set<Task> 
tasksToCloseDirtyFromStateUpdater) {
         futures.entrySet().stream()
-            .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
-            .filter(Objects::nonNull)
-            .forEach(removedTaskResult -> {
-                if (removedTaskResult.exception().isPresent()) {
-                    
tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task());
-                } else {
-                    
tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task());
-                }
-            });
+                .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
+                .filter(Objects::nonNull)
+                .forEach(removedTaskResult -> {
+                    if (removedTaskResult.exception().isPresent()) {
+                        final RuntimeException runtimeException = 
removedTaskResult.exception().get();
+                        if (runtimeException instanceof 
TaskCorruptedException) {

Review Comment:
   Agree 100% -- I know "classifying & cleaning up exceptions" has been on our 
TODO list forever because it's a huge goal, but I think it doesn't hurt to 
start now, even if it's not perfect and we don't go through and classify/fix 
everything that exists today. Especially in light of all the huge changes 
coming (eg KIP-1071) it would be good to just introduce a 
close-clean/close-dirty exception type and start using it going forward. Also 
interested to hear from Bruno on this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to