cadonna commented on code in PR #17711:
URL: https://github.com/apache/kafka/pull/17711#discussion_r1846148048


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -573,16 +581,23 @@ public void flushCache() {
                 } catch (final RuntimeException exception) {
                     if (firstException == null) {
                         // do NOT wrap the error if it is actually caused by 
Streams itself
-                        if (exception instanceof StreamsException) {
+                        // In case of FailedProcessingException Do not keep 
the failed processing exception in the stack trace
+                        if (exception instanceof FailedProcessingException) {
+                            firstException = new ProcessorStateException(
+                                format("%sFailed to flush state store %s", 
logPrefix, store.name()),

Review Comment:
   This should be `%sFailed to flush cache of store %s` as on line 593.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -618,13 +633,20 @@ public void close() throws ProcessorStateException {
                 } catch (final RuntimeException exception) {
                     if (firstException == null) {
                         // do NOT wrap the error if it is actually caused by 
Streams itself
-                        if (exception instanceof StreamsException)
+                        // In case of FailedProcessingException Do not keep 
the failed processing exception in the stack trace
+                        if (exception instanceof FailedProcessingException)
+                            firstException = new ProcessorStateException(
+                                format("%sFailed to flush state store %s", 
logPrefix, store.name()),
+                                exception.getCause());
+                        else if (exception instanceof StreamsException)
                             firstException = exception;
                         else
                             firstException = new ProcessorStateException(
                                 format("%sFailed to close state store %s", 
logPrefix, store.name()), exception);
+                        log.error("Failed to flush cache of store {}: ", 
store.name(), firstException);
+                    } else {
+                        log.error("Failed to flush cache of store {}: ", 
store.name(), exception);

Review Comment:
   These two log messages should be `Failed to close state store {}:` as the 
removed log.error() says.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##########
@@ -771,6 +772,38 @@ public void close() {
         assertEquals(exception, thrown);
     }
 
+    @Test
+    public void 
shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAFailedProcessingException()
 {
+        final RuntimeException exception = new RuntimeException("KABOOM!");
+        final ProcessorStateManager stateManager = 
getStateManager(Task.TaskType.ACTIVE);
+        final MockKeyValueStore stateStore = new 
MockKeyValueStore(persistentStoreName, true) {
+            @Override
+            public void flush() {
+                throw new FailedProcessingException("processor", exception);
+            }
+        };
+        stateManager.registerStore(stateStore, 
stateStore.stateRestoreCallback, null);
+
+        final ProcessorStateException thrown = 
assertThrows(ProcessorStateException.class, stateManager::flush);
+        assertEquals(exception, thrown.getCause());

Review Comment:
   Could you also verify the stacktrace here? Something like:
   ```java
   assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch(element -> 
element.getClassName().contains(FailedProcessingException.class.getSimpleName())));
   ```
   IMO, we should not pollute the stack trace with internal exceptions we only 
need to skip the exception handler.
   
   This also applies to the other added tests.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -538,13 +539,20 @@ public void flush() {
                 } catch (final RuntimeException exception) {
                     if (firstException == null) {
                         // do NOT wrap the error if it is actually caused by 
Streams itself
-                        if (exception instanceof StreamsException)
+                        // In case of FailedProcessingException Do not keep 
the failed processing exception in the stack trace
+                        if (exception instanceof FailedProcessingException)
+                            firstException = new ProcessorStateException(
+                                format("%sFailed to flush state store %s", 
logPrefix, store.name()),
+                                exception.getCause());
+                        else if (exception instanceof StreamsException)
                             firstException = exception;
                         else
                             firstException = new ProcessorStateException(
                                 format("%sFailed to flush state store %s", 
logPrefix, store.name()), exception);
+                        log.error("Failed to flush cache of store {}: ", 
store.name(), firstException);
+                    } else {
+                        log.error("Failed to flush cache of store {}: ", 
store.name(), exception);

Review Comment:
   These two log messages should be `Failed to flush state store {}: ` as the 
removed `log.error()` says.  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -618,13 +633,20 @@ public void close() throws ProcessorStateException {
                 } catch (final RuntimeException exception) {
                     if (firstException == null) {
                         // do NOT wrap the error if it is actually caused by 
Streams itself
-                        if (exception instanceof StreamsException)
+                        // In case of FailedProcessingException Do not keep 
the failed processing exception in the stack trace
+                        if (exception instanceof FailedProcessingException)
+                            firstException = new ProcessorStateException(
+                                format("%sFailed to flush state store %s", 
logPrefix, store.name()),

Review Comment:
   This should be `%sFailed to close state store %s` as on line 645



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to