loicgreffier commented on code in PR #16300:
URL: https://github.com/apache/kafka/pull/16300#discussion_r1697703902


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
         } catch (final StreamsException e) {

Review Comment:
   @mjsax NVM, in the following case:
   
   ```java
   builder
     .stream(...)
     .process(...) // punctuate() calls context.forward() here
     .map((key, value) -> throw new RuntimeException());
   ```
   
   RuntimeException is being caught and handled in the `ProcessorNode#process`, 
then thrown as a `FailedProcessingException` and caught in the 
`StreamTask#punctuate`. 
   
   In `punctuate`, So I think we should:
   - catch `FailedProcessingException` coming from nodes, unwrap it and throw a 
new StreamsException ➡️ It avoids passing the exception through the processing 
exception handler twice.
   - catch `TaskCorruptedException | TaskMigratedException` and throw it as is 
➡️ We do not want to handle them / wrap them to anything else.
   - catch `Exception`, apply the processing exception handler and throw 
StreamsException ➡️ We handle anything else is not coming from a processor node.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to