loicgreffier commented on code in PR #16300: URL: https://github.com/apache/kafka/pull/16300#discussion_r1697703902
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node, } catch (final StreamsException e) { Review Comment: @mjsax NVM, in the following case: ```java builder .stream(...) .process(...) // punctuate() calls context.forward() here .map((key, value) -> throw new RuntimeException()); ``` RuntimeException is being caught and handled in the `ProcessorNode#process`, then thrown as a `FailedProcessingException` and caught in the `StreamTask#punctuate`. In `punctuate`, So I think we should: - catch `FailedProcessingException` coming from nodes, unwrap it and throw a new StreamsException ➡️ It avoids passing the exception through the processing exception handler twice. - catch `TaskCorruptedException | TaskMigratedException` and throw it as is ➡️ We do not want to handle them / wrap them to anything else. - catch `Exception`, apply the processing exception handler and throw StreamsException ➡️ We handle anything else is not coming from a processor node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org