guozhangwang commented on a change in pull request #11686:
URL: https://github.com/apache/kafka/pull/11686#discussion_r788072458



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -523,16 +528,21 @@ private boolean wrappedExceptionIsIn(final Throwable 
throwable, final Set<Class<
     }
 
     private void handleStreamsUncaughtException(final Throwable throwable,
-                                                final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+                                                final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
+                                                final boolean 
skipThreadReplacement) {

Review comment:
       If we think that there will be other scenarios in the future where we 
want to override user's decision to skip replacing threads, then I feel this is 
okay. Otherwise I'm more inclined to just do it in a special condition as 
`decision == REPLACE THREAD && throwable instanceof 
MissingSourceTopicException`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -523,16 +528,21 @@ private boolean wrappedExceptionIsIn(final Throwable 
throwable, final Set<Class<
     }
 
     private void handleStreamsUncaughtException(final Throwable throwable,
-                                                final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+                                                final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
+                                                final boolean 
skipThreadReplacement) {
         final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
         if (oldHandler) {
             log.warn("Stream's new uncaught exception handler is set as well 
as the deprecated old handler." +
                     "The old handler will be ignored as long as a new handler 
is set.");
         }
         switch (action) {
             case REPLACE_THREAD:
-                log.error("Replacing thread in the streams uncaught exception 
handler", throwable);
-                replaceStreamThread(throwable);
+                if (!skipThreadReplacement) {
+                    log.error("Replacing thread in the streams uncaught 
exception handler", throwable);
+                    replaceStreamThread(throwable);
+                } else {
+                    log.debug("Skipping thread replacement for recoverable 
error");

Review comment:
       I'd suggest logging it as a warn instead, also we can be more explicit 
in the message like "user decides to replace threads, which is not necessary; 
the library overrides that decision and skip.. please consider changing your 
exception handler class to not do so.."

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -67,6 +67,9 @@
 
     private final ConcurrentNavigableMap<String, InternalTopologyBuilder> 
builders; // Keep sorted by topology name for readability
 
+    // Handler for recoverable StreamsExceptions which don't require 
killing/replacing the thread
+    private java.util.function.Consumer<Throwable> 
recoverableStreamsExceptionHandler;
+

Review comment:
       +1

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -523,16 +528,21 @@ private boolean wrappedExceptionIsIn(final Throwable 
throwable, final Set<Class<
     }
 
     private void handleStreamsUncaughtException(final Throwable throwable,
-                                                final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+                                                final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
+                                                final boolean 
skipThreadReplacement) {

Review comment:
       Nevermind, discussed with @ableegoldman offline and we do want to keep 
it a general scheme.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to