eduwercamacaro commented on code in PR #17761:
URL: https://github.com/apache/kafka/pull/17761#discussion_r1939718078


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId, 
CompletableFuture<StateUpdater.
                                    final Set<Task> 
tasksToCloseCleanFromStateUpdater,
                                    final Set<Task> 
tasksToCloseDirtyFromStateUpdater) {
         futures.entrySet().stream()
-            .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
-            .filter(Objects::nonNull)
-            .forEach(removedTaskResult -> {
-                if (removedTaskResult.exception().isPresent()) {
-                    
tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task());
-                } else {
-                    
tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task());
-                }
-            });
+                .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
+                .filter(Objects::nonNull)
+                .forEach(removedTaskResult -> {
+                    if (removedTaskResult.exception().isPresent()) {
+                        final RuntimeException runtimeException = 
removedTaskResult.exception().get();
+                        if (runtimeException instanceof 
TaskCorruptedException) {

Review Comment:
   @cadonna Thanks for these inputs. I believe we don't need to implement this 
exception handling in the `TaskManager.handleExceptionsFromStateUpdater` method 
because it is already being handled at StreamThread level. StateUpdater's 
exceptions get propagated to the StreamThread's exception handling and then 
decide whether to close the task clean or dirty. That's what I understand from 
the code, but maybe I'm wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to