guozhangwang commented on a change in pull request #11686: URL: https://github.com/apache/kafka/pull/11686#discussion_r788072458
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -523,16 +528,21 @@ private boolean wrappedExceptionIsIn(final Throwable throwable, final Set<Class< } private void handleStreamsUncaughtException(final Throwable throwable, - final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, + final boolean skipThreadReplacement) { Review comment: If we think that there will be other scenarios in the future where we want to override user's decision to skip replacing threads, then I feel this is okay. Otherwise I'm more inclined to just do it in a special condition as `decision == REPLACE THREAD && throwable instanceof MissingSourceTopicException`. ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -523,16 +528,21 @@ private boolean wrappedExceptionIsIn(final Throwable throwable, final Set<Class< } private void handleStreamsUncaughtException(final Throwable throwable, - final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, + final boolean skipThreadReplacement) { final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler); if (oldHandler) { log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." + "The old handler will be ignored as long as a new handler is set."); } switch (action) { case REPLACE_THREAD: - log.error("Replacing thread in the streams uncaught exception handler", throwable); - replaceStreamThread(throwable); + if (!skipThreadReplacement) { + log.error("Replacing thread in the streams uncaught exception handler", throwable); + replaceStreamThread(throwable); + } else { + log.debug("Skipping thread replacement for recoverable error"); Review comment: I'd suggest logging it as a warn instead, also we can be more explicit in the message like "user decides to replace threads, which is not necessary; the library overrides that decision and skip.. please consider changing your exception handler class to not do so.." ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -67,6 +67,9 @@ private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability + // Handler for recoverable StreamsExceptions which don't require killing/replacing the thread + private java.util.function.Consumer<Throwable> recoverableStreamsExceptionHandler; + Review comment: +1 ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -523,16 +528,21 @@ private boolean wrappedExceptionIsIn(final Throwable throwable, final Set<Class< } private void handleStreamsUncaughtException(final Throwable throwable, - final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, + final boolean skipThreadReplacement) { Review comment: Nevermind, discussed with @ableegoldman offline and we do want to keep it a general scheme. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org