loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1641084096
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java: ########## @@ -288,7 +301,31 @@ private <K, V> void forwardInternal(final ProcessorNode<K, V, ?, ?> child, final Record<K, V> record) { setCurrentNode(child); - child.process(record); + try { Review Comment: @cadonna PR has been updated from https://github.com/apache/kafka/pull/16137. 1. We register the `processingExceptionHandler` in `InternalTopologyBuilder#rewriteTopology`. The streams config is available already. It does not impact either `InternalTopologyBuilder#buildTopology` or `InternalTopologyBuilder#buildSubtopology`, and it deals with Topology Test Driver setup as well. 2. When catching processing errors in `ProcessorNode#process`, we have to deal with exceptions being thrown by a child node then caught, handled and rethrown by each parent node of the node chain. To deal with this case, the child node throws a `StreamsException`, and let each parent just catches and rethrows any `StreamsException` as is: https://github.com/apache/kafka/pull/16093/files#diff-a961c8e4e1005027be9de9500bed2c7ee66b318132ebff3d7d62df81b5856198R192 The drawback is exceptions of type `StreamsException` won't be handled by the processing exception handler. Thus, if a user manually throws a `StreamsException` in one of its processor nodes, it won't be handled. ➡️ This makes me wonder if we should deal with this scenario, and maybe introduce a brand-new internal exception (e.g., `ProcessingStreamsException`) to not mess up with `StreamsException`. Let us your toughts about point 1 and 2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org