mjsax commented on code in PR #17761:
URL: https://github.com/apache/kafka/pull/17761#discussion_r1927659866


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -731,15 +732,20 @@ private void addToTasksToClose(final Map<TaskId, 
CompletableFuture<StateUpdater.
                                    final Set<Task> 
tasksToCloseCleanFromStateUpdater,
                                    final Set<Task> 
tasksToCloseDirtyFromStateUpdater) {
         futures.entrySet().stream()
-            .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
-            .filter(Objects::nonNull)
-            .forEach(removedTaskResult -> {
-                if (removedTaskResult.exception().isPresent()) {
-                    
tasksToCloseDirtyFromStateUpdater.add(removedTaskResult.task());
-                } else {
-                    
tasksToCloseCleanFromStateUpdater.add(removedTaskResult.task());
-                }
-            });
+                .map(entry -> waitForFuture(entry.getKey(), entry.getValue()))
+                .filter(Objects::nonNull)
+                .forEach(removedTaskResult -> {
+                    if (removedTaskResult.exception().isPresent()) {
+                        final RuntimeException runtimeException = 
removedTaskResult.exception().get();
+                        if (runtimeException instanceof 
TaskCorruptedException) {

Review Comment:
   > InterruptedException exceptions are also possible, and I recommend 
wrapping this exception in a StreamsException too. 
   
   Might be good to get input from @cadonna about this question?
   
   About `IllegalStateException` I also think we should close dirty. In the 
end, it should not happen to begin with, but would indicate a bug. If we lose 
the standby state when we hit a bug, I think it's acceptable.
   
   Another thought: When we introduced internal handling of `TimeoutExceptions` 
from the client, we actually did a very detailed analysis what exception could 
be through for what reason and how to handle it (cf 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture#KafkaStreamsArchitecture-ExceptionHandling
 -- it's of course somewhat outdated). Wondering to what extend we should do a 
similar analysis for the state-updater code, to reason about what exception we 
might get, and how to group them reasonable? -- I agree that whitelisting 
individual exceptions in this part of the code would not be ideal. I think it 
would be much better to introduce some "CloseCleanStreamsException" that we 
check here, and modify state-updated code to make a judgment, to wrap or not 
warp as "CloseCleanStreamsException", where the exception happens? It's much 
easier to reason about "close clean" vs "close dirty" throughout the code, than 
in a single centralized place in which we have n
 o context information about the root cause of the error.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to