cadonna commented on code in PR #16300:
URL: https://github.com/apache/kafka/pull/16300#discussion_r1686395355


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
         } catch (final StreamsException e) {
             throw e;
         } catch (final RuntimeException e) {

Review Comment:
   For `process()`, Streams catches a `Exception` and here it is a 
`RuntimeException`. As far as I understand the difference is that 
`RuntimeException` only catches unchecked exceptions whereas `Exception` 
catches unchecked and checked exception. Since Streams does not throw any 
checked exceptions from either `process()` or `punctuate` it is fine to use in 
both places `RuntimeException`.
   Could you change this also for `process()` in this PR? 



##########
checkstyle/suppressions.xml:
##########
@@ -193,7 +193,7 @@
 
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
-              
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore).java"/>
+              
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>

Review Comment:
   That is a pity 😞 ! But I do not see a quick fix to avoid the change. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
         } catch (final StreamsException e) {
             throw e;
         } catch (final RuntimeException e) {
-            throw new StreamsException(String.format("%sException caught while 
punctuating processor '%s'", logPrefix, node.name()), e);
+            final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(recordContext.topic(),
+                    recordContext.partition(), recordContext.offset(), 
recordContext.headers(), null, null, node.name(), id);

Review Comment:
   nit:
   ```suggestion
               final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
                   recordContext.topic(), 
                   recordContext.partition(), 
                   recordContext.offset(), 
                   recordContext.headers(), 
                   null, 
                   null, 
                   node.name(), 
                   id
               );
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
         } catch (final StreamsException e) {
             throw e;
         } catch (final RuntimeException e) {
-            throw new StreamsException(String.format("%sException caught while 
punctuating processor '%s'", logPrefix, node.name()), e);
+            final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(recordContext.topic(),
+                    recordContext.partition(), recordContext.offset(), 
recordContext.headers(), null, null, node.name(), id);
+
+            final ProcessingExceptionHandler.ProcessingHandlerResponse 
response = processingExceptionHandler
+                    .handle(errorHandlerContext, null, e);
+
+            if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
+                throw new StreamsException(String.format("%sException caught 
while punctuating processor '%s'", logPrefix, node.name()), e);

Review Comment:
   Could you add a error log here?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -916,7 +920,17 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
         } catch (final StreamsException e) {

Review Comment:
   Now we have a weird situation where you can throw a `StreamsException` from 
`process()` and Streams will handle it in the processing handler but when you 
throw a `StreamsException` from `punctuate()` Streams will not handle it.



##########
streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java:
##########
@@ -78,6 +82,7 @@ public void shouldFailWhenProcessingExceptionOccurs() {
             .map(KeyValue::new)
             .mapValues(value -> value)
             .process(runtimeErrorProcessorSupplierMock())
+            .process(runtimeErrorPunctuateProcessorSupplierMock())

Review Comment:
   You should not test punctuate with the `TopologyTestDriver`. Method 
`punctuate` is called outside of the topology in the poll loop. Using the 
`TopologyTestDriver`, you basically call `task.maybePunctuateSystemTime();` in 
`driver.advanceWallClockTime()` which is a helper test infra method. This is 
not really testing the integration of `ProcessingExceptionHandler` with 
`punctuate()` in the Streams runtime.
   Testing the integration of `ProcessingExceptionHandler` in `process()` with 
the `TopologyTestDriver` makes a little bit more sense because you can test the 
case when a downstream processor throws and it should not affect an upstream 
processor, but that case you do not have with `punctuate()`. However, also for 
`process()` a real intergration test would be better.
   So given the tight schedule, my proposal is to revert the changes regarding 
`punctuate()` in this file and only have unit tests. Then, you can add a real 
integration test in a separate PR when we merged all the rest first.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to