cadonna commented on code in PR #16300: URL: https://github.com/apache/kafka/pull/16300#discussion_r1686395355
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node, } catch (final StreamsException e) { throw e; } catch (final RuntimeException e) { Review Comment: For `process()`, Streams catches a `Exception` and here it is a `RuntimeException`. As far as I understand the difference is that `RuntimeException` only catches unchecked exceptions whereas `Exception` catches unchecked and checked exception. Since Streams does not throw any checked exceptions from either `process()` or `punctuate` it is fine to use in both places `RuntimeException`. Could you change this also for `process()` in this PR? ########## checkstyle/suppressions.xml: ########## @@ -193,7 +193,7 @@ <!-- Streams --> <suppress checks="ClassFanOutComplexity" - files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore).java"/> + files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/> Review Comment: That is a pity 😞 ! But I do not see a quick fix to avoid the change. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node, } catch (final StreamsException e) { throw e; } catch (final RuntimeException e) { - throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); + final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(recordContext.topic(), + recordContext.partition(), recordContext.offset(), recordContext.headers(), null, null, node.name(), id); Review Comment: nit: ```suggestion final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( recordContext.topic(), recordContext.partition(), recordContext.offset(), recordContext.headers(), null, null, node.name(), id ); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node, } catch (final StreamsException e) { throw e; } catch (final RuntimeException e) { - throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); + final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(recordContext.topic(), + recordContext.partition(), recordContext.offset(), recordContext.headers(), null, null, node.name(), id); + + final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler + .handle(errorHandlerContext, null, e); + + if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { + throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); Review Comment: Could you add a error log here? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node, } catch (final StreamsException e) { Review Comment: Now we have a weird situation where you can throw a `StreamsException` from `process()` and Streams will handle it in the processing handler but when you throw a `StreamsException` from `punctuate()` Streams will not handle it. ########## streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java: ########## @@ -78,6 +82,7 @@ public void shouldFailWhenProcessingExceptionOccurs() { .map(KeyValue::new) .mapValues(value -> value) .process(runtimeErrorProcessorSupplierMock()) + .process(runtimeErrorPunctuateProcessorSupplierMock()) Review Comment: You should not test punctuate with the `TopologyTestDriver`. Method `punctuate` is called outside of the topology in the poll loop. Using the `TopologyTestDriver`, you basically call `task.maybePunctuateSystemTime();` in `driver.advanceWallClockTime()` which is a helper test infra method. This is not really testing the integration of `ProcessingExceptionHandler` with `punctuate()` in the Streams runtime. Testing the integration of `ProcessingExceptionHandler` in `process()` with the `TopologyTestDriver` makes a little bit more sense because you can test the case when a downstream processor throws and it should not affect an upstream processor, but that case you do not have with `punctuate()`. However, also for `process()` a real intergration test would be better. So given the tight schedule, my proposal is to revert the changes regarding `punctuate()` in this file and only have unit tests. Then, you can add a real integration test in a separate PR when we merged all the rest first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org