vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499019285



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
         }
     }
 
+    /**
+     * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+     * terminates due to an uncaught exception.
+     *
+     * @param eh the uncaught exception handler of type {@link 
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes 
the current handler
+     * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+     */
+    public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler eh) {
+        final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, eh);
+        synchronized (stateLock) {
+            if (state == State.CREATED) {
+                for (final StreamThread thread : threads) {
+                    if (eh != null)  {
+                        thread.setStreamsUncaughtExceptionHandler(handler);
+                    } else {
+                        final StreamsUncaughtExceptionHandler defaultHandler = 
exception ->
+                                
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD;
+                        
thread.setStreamsUncaughtExceptionHandler(defaultHandler);
+                    }
+                }
+            } else {
+                throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+                        "Current state is: " + state);
+            }
+        }
+    }
+
+    private 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handleStreamsUncaughtException(final Exception e,
+                                                                               
                                    final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+        final 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action 
= streamsUncaughtExceptionHandler.handle(e);
+        switch (action) {
+            case SHUTDOWN_STREAM_THREAD:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the thread is going to shut down: ", e);
+                break;
+            case REPLACE_STREAM_THREAD:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the the stream thread will be replaced: ", e); 
//TODO: add then remove, wait until 663 is merged
+                break;
+            case SHUTDOWN_KAFKA_STREAMS_CLIENT:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the client is going to shut down: ", e);
+                for (final StreamThread streamThread: threads) {
+                    streamThread.shutdown();
+                }

Review comment:
       It seems safer to just call the nonblocking close method:
   ```suggestion
                   close(Duration.ZERO);
   ```
   That way, it'll properly set the state, stop the cleaner thread, etc.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -436,6 +496,8 @@ private void maybeSetError() {
             }
 
             if (setState(State.ERROR)) {
+                metrics.close();

Review comment:
       Actually, now that I have a more in-depth picture of what is going on, I 
disagree. I think we should leave these threads running until users actually 
call `KafkaStreams.close`, since the could alternatively add more threads (via 
KIP-633) to transition back from ERROR state into RUNNING, at which point, 
we'll be sorry that we killed these threads, right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -346,6 +347,9 @@ public void setStateListener(final 
KafkaStreams.StateListener listener) {
      *
      * @param eh the uncaught exception handler for all internal threads; 
{@code null} deletes the current handler
      * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+     *
+     * @Deprecated user {@link 
KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} 
instead.

Review comment:
       ```suggestion
        * @Deprecated Since 2.7.0. Use {@link 
KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} 
instead.
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
         }
     }
 
+    /**
+     * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+     * terminates due to an uncaught exception.
+     *
+     * @param eh the uncaught exception handler of type {@link 
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes 
the current handler
+     * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+     */
+    public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler eh) {

Review comment:
       ```suggestion
       public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler uncaughtExceptionHandler) {
   ```
   
   We prefer to resist the urge to abbreviate, especially in the public-facing 
APIs.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+    /**
+     * Inspect a record and the exception received.
+     * @param exception the actual exception
+     */
+    StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Exception exception);

Review comment:
       Let's tweak this API to Throwable:
   ```suggestion
       StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Throwable exception);
   ```
   
   Here's a good explanation of why: 
https://stackoverflow.com/questions/2274102/difference-between-using-throwable-and-exception-in-a-try-catch
   
   The benefit is that we could handle `Error`s as well as `Exception`s. 
However, this comes with the obligation that we should not continue to use the 
thread after an Error occurs. I think we can deal with this restriction 
reasonably.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+    /**
+     * Inspect a record and the exception received.
+     * @param exception the actual exception
+     */
+    StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Exception exception);
+
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum StreamsUncaughtExceptionHandlerResponse {
+
+
+        SHUTDOWN_STREAM_THREAD(0, "SHUTDOWN_STREAM_THREAD"),
+        REPLACE_STREAM_THREAD(1, "REPLACE_STREAM_THREAD"),
+        SHUTDOWN_KAFKA_STREAMS_CLIENT(2, "SHUTDOWN_KAFKA_STREAMS_CLIENT"),
+        SHUTDOWN_KAFKA_STREAMS_APPLICATION(3, 
"SHUTDOWN_KAFKA_STREAMS_APPLICATION");
+
+
+        /** an english description of the api--this is for debugging and can 
change */
+        public final String name;
+
+        /** the permanent and immutable id of an API--this can't change ever */

Review comment:
       Thanks for clarifying this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
         }
     }
 
+    /**
+     * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+     * terminates due to an uncaught exception.

Review comment:
       ```suggestion
        * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
        * throws an unexpected exception. These might be exceptions indicating 
rare bugs in Kafka Streams, or they
        * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
        * logic.
        * <p>
        * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
        * thread that encounters such an exception.
   ```
   
   I think it's wrong to say that this is invoked when the thread abruptly 
terminates, because it's not. That's how the JVM handler works, but we're 
actually executing this handler while the thread is still running, and in fact 
that thread itself is what calls the handler.
   
   It also seemed appropriate to elaborate a little more on the usage of this 
method.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -616,7 +616,7 @@ public void 
shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState(
         final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
         streams.start();
         try {
-            streams.setUncaughtExceptionHandler(null);
+            
streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null);

Review comment:
       Should we also test the behavior of setting the new handler to `null`? 
And actually, why is `null` allowed now, but it wasn't before?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -54,6 +54,8 @@ public void onPartitionsAssigned(final 
Collection<TopicPartition> partitions) {
         if (assignmentErrorCode.get() == 
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
             log.error("Received error code {}", assignmentErrorCode.get());
             throw new MissingSourceTopicException("One or more source topics 
were missing during rebalance");
+        } else if (assignmentErrorCode.get() == 
AssignorError.SHUTDOWN_REQUESTED.code()) {
+            streamThread.shutdown(); //TODO: 663 should set client to error if 
all streams are dead

Review comment:
       didn't quite follow this TODO

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+    /**
+     * Inspect a record and the exception received.
+     * @param exception the actual exception
+     */
+    StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Exception exception);
+
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum StreamsUncaughtExceptionHandlerResponse {
+
+
+        SHUTDOWN_STREAM_THREAD(0, "SHUTDOWN_STREAM_THREAD"),
+        REPLACE_STREAM_THREAD(1, "REPLACE_STREAM_THREAD"),

Review comment:
       +1 I think we should at least comment it out until that other PR is 
actually in the codebase.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##########
@@ -82,11 +83,21 @@ public SubscriptionInfo(final int version,
                             final UUID processId,
                             final String userEndPoint,
                             final Map<TaskId, Long> taskOffsetSums) {
+        this(version, latestSupportedVersion, processId, userEndPoint, 
taskOffsetSums, new AtomicInteger(0));
+    }
+
+    public SubscriptionInfo(final int version,
+                            final int latestSupportedVersion,
+                            final UUID processId,
+                            final String userEndPoint,
+                            final Map<TaskId, Long> taskOffsetSums,
+                            final AtomicInteger shutdownRequested) {

Review comment:
       Cool, so maybe this field should be called something else, like 
"subscription flag"?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AppShutdownIntegrationTest.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+
+@Category(IntegrationTest.class)
+public class AppShutdownIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Test
+    public void shouldSendShutDownSignal() throws Exception {
+        //
+        //
+        // Also note that this is an integration test because so many 
components have to come together to
+        // ensure these configurations wind up where they belong, and any 
number of future code changes
+        // could break this change.
+
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + testId;
+        final String inputTopic = "input" + testId;
+
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final List<KeyValue<Object, Object>> processorValueCollector = new 
ArrayList<>();
+
+        builder.stream(inputTopic).process(() -> new 
ShutdownProcessor(processorValueCollector), Named.as("process"));
+
+        final Properties properties = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
"5"),
+                        mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 
"6"),
+                        mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+                        
mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000")
+                )
+        );
+
+        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+            final CountDownLatch latch = new CountDownLatch(1);
+            kafkaStreams.setUncaughtExceptionHandler(exception -> 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_KAFKA_STREAMS_APPLICATION);

Review comment:
       Feel free to import `SHUTDOWN_KAFKA_STREAMS_APPLICATION` and save like 
50 characters on this line ;) 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -529,8 +541,7 @@ public void run() {
                 }
             }
 
-            log.error("Encountered the following exception during processing " 
+
-                "and the thread is going to shut down: ", e);
+            handleStreamsUncaughtException(e);
             throw e;

Review comment:
       I'm not sure this is right, actually. If we wanted to use the current 
thread to send the "poison pill" subscription, it needs to keep running and 
call poll again.
   
   Maybe instead we should have a default implementation of the 
uncaughtExceptionHandler that invokes the legacy one and then returns 
`SHUTDOWN_STREAM_THREAD`, and then the implementation of that case in 
KafkaStreams would be to re-throw the exception.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
         }
     }
 
+    /**
+     * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+     * terminates due to an uncaught exception.
+     *
+     * @param eh the uncaught exception handler of type {@link 
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes 
the current handler
+     * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+     */
+    public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler eh) {
+        final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, eh);
+        synchronized (stateLock) {
+            if (state == State.CREATED) {
+                for (final StreamThread thread : threads) {
+                    if (eh != null)  {
+                        thread.setStreamsUncaughtExceptionHandler(handler);
+                    } else {
+                        final StreamsUncaughtExceptionHandler defaultHandler = 
exception ->
+                                
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD;
+                        
thread.setStreamsUncaughtExceptionHandler(defaultHandler);
+                    }
+                }
+            } else {
+                throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+                        "Current state is: " + state);
+            }
+        }
+    }
+
+    private 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handleStreamsUncaughtException(final Exception e,
+                                                                               
                                    final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+        final 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action 
= streamsUncaughtExceptionHandler.handle(e);
+        switch (action) {
+            case SHUTDOWN_STREAM_THREAD:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the thread is going to shut down: ", e);
+                break;
+            case REPLACE_STREAM_THREAD:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the the stream thread will be replaced: ", e); 
//TODO: add then remove, wait until 663 is merged

Review comment:
       Let's just comment out this whole enum value until 663 is implemented, 
that way we won't have a TODO hanging around in case 663 doesn't make the 
release for some reason.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
         }
     }
 
+    /**
+     * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+     * terminates due to an uncaught exception.
+     *
+     * @param eh the uncaught exception handler of type {@link 
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes 
the current handler
+     * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+     */
+    public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler eh) {
+        final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, eh);
+        synchronized (stateLock) {
+            if (state == State.CREATED) {
+                for (final StreamThread thread : threads) {
+                    if (eh != null)  {
+                        thread.setStreamsUncaughtExceptionHandler(handler);
+                    } else {
+                        final StreamsUncaughtExceptionHandler defaultHandler = 
exception ->
+                                
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD;
+                        
thread.setStreamsUncaughtExceptionHandler(defaultHandler);
+                    }
+                }
+            } else {
+                throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+                        "Current state is: " + state);
+            }
+        }
+    }
+
+    private 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handleStreamsUncaughtException(final Exception e,
+                                                                               
                                    final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+        final 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action 
= streamsUncaughtExceptionHandler.handle(e);
+        switch (action) {
+            case SHUTDOWN_STREAM_THREAD:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the thread is going to shut down: ", e);
+                break;
+            case REPLACE_STREAM_THREAD:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the the stream thread will be replaced: ", e); 
//TODO: add then remove, wait until 663 is merged
+                break;
+            case SHUTDOWN_KAFKA_STREAMS_CLIENT:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the client is going to shut down: ", e);
+                for (final StreamThread streamThread: threads) {
+                    streamThread.shutdown();
+                }

Review comment:
       Oh, I forgot; the reason you're doing it this way is to transition to 
ERROR, not actually shut down, right?
   
   In that case, it seems pretty odd to call this option "shut down", since it 
doesn't actually _shut down_, it only kills all the threads, leaving the final 
"shut down" as an exercise to the user.
   
   If I recall correctly, the preference of the group was in favor of this 
behavior, in which case, I'd advocate for a different name. Maybe just 
`STOP_STREAM_THREAD`, `STOP_ALL_STREAM_THREADS`, and 
`STOP_ALL_STREAM_THREADS_IN_CLUSTER`.
   
   I've been on the fence about whether I should leave this feedback or not, 
but decided to go ahead and pass it on to you because I just got confused by 
the names, despite having recently participating in that discussion. So it 
seems likely that users would also be confused and think that we're doing the 
wrong thing by not actually shutting down the client.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -550,6 +561,10 @@ void runLoop() {
         // until the rebalance is completed before we close and commit the 
tasks
         while (isRunning() || taskManager.isRebalanceInProgress()) {
             try {
+                if (shutdownRequested.get()) {
+                    sendShutdownRequest(shutdownTypeRequested);
+                    return;

Review comment:
       I can't tell; are we guaranteed to actually send the joinGroup request 
by now?
   
   Maybe it's safer to just keep running this loop until we get back the "you 
should shut down" response assignment. We _could/should_ add a condition into 
`runOnce` so that we don't actually process anything once we have 
`shutdownRequested` set.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to