vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499019285
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } + /** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly + * terminates due to an uncaught exception. + * + * @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ + public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) { + final StreamsUncaughtExceptionHandler handler = exception -> handleStreamsUncaughtException(exception, eh); + synchronized (stateLock) { + if (state == State.CREATED) { + for (final StreamThread thread : threads) { + if (eh != null) { + thread.setStreamsUncaughtExceptionHandler(handler); + } else { + final StreamsUncaughtExceptionHandler defaultHandler = exception -> + StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD; + thread.setStreamsUncaughtExceptionHandler(defaultHandler); + } + } + } else { + throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + + "Current state is: " + state); + } + } + } + + private StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handleStreamsUncaughtException(final Exception e, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { + final StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action = streamsUncaughtExceptionHandler.handle(e); + switch (action) { + case SHUTDOWN_STREAM_THREAD: + log.error("Encountered the following exception during processing " + + "and the thread is going to shut down: ", e); + break; + case REPLACE_STREAM_THREAD: + log.error("Encountered the following exception during processing " + + "and the the stream thread will be replaced: ", e); //TODO: add then remove, wait until 663 is merged + break; + case SHUTDOWN_KAFKA_STREAMS_CLIENT: + log.error("Encountered the following exception during processing " + + "and the client is going to shut down: ", e); + for (final StreamThread streamThread: threads) { + streamThread.shutdown(); + } Review comment: It seems safer to just call the nonblocking close method: ```suggestion close(Duration.ZERO); ``` That way, it'll properly set the state, stop the cleaner thread, etc. ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -436,6 +496,8 @@ private void maybeSetError() { } if (setState(State.ERROR)) { + metrics.close(); Review comment: Actually, now that I have a more in-depth picture of what is going on, I disagree. I think we should leave these threads running until users actually call `KafkaStreams.close`, since the could alternatively add more threads (via KIP-633) to transition back from ERROR state into RUNNING, at which point, we'll be sorry that we killed these threads, right? ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -346,6 +347,9 @@ public void setStateListener(final KafkaStreams.StateListener listener) { * * @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * + * @Deprecated user {@link KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} instead. Review comment: ```suggestion * @Deprecated Since 2.7.0. Use {@link KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} instead. ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } + /** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly + * terminates due to an uncaught exception. + * + * @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ + public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) { Review comment: ```suggestion public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler uncaughtExceptionHandler) { ``` We prefer to resist the urge to abbreviate, especially in the public-facing APIs. ########## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { + /** + * Inspect a record and the exception received. + * @param exception the actual exception + */ + StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Exception exception); Review comment: Let's tweak this API to Throwable: ```suggestion StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Throwable exception); ``` Here's a good explanation of why: https://stackoverflow.com/questions/2274102/difference-between-using-throwable-and-exception-in-a-try-catch The benefit is that we could handle `Error`s as well as `Exception`s. However, this comes with the obligation that we should not continue to use the thread after an Error occurs. I think we can deal with this restriction reasonably. ########## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { + /** + * Inspect a record and the exception received. + * @param exception the actual exception + */ + StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Exception exception); + + /** + * Enumeration that describes the response from the exception handler. + */ + enum StreamsUncaughtExceptionHandlerResponse { + + + SHUTDOWN_STREAM_THREAD(0, "SHUTDOWN_STREAM_THREAD"), + REPLACE_STREAM_THREAD(1, "REPLACE_STREAM_THREAD"), + SHUTDOWN_KAFKA_STREAMS_CLIENT(2, "SHUTDOWN_KAFKA_STREAMS_CLIENT"), + SHUTDOWN_KAFKA_STREAMS_APPLICATION(3, "SHUTDOWN_KAFKA_STREAMS_APPLICATION"); + + + /** an english description of the api--this is for debugging and can change */ + public final String name; + + /** the permanent and immutable id of an API--this can't change ever */ Review comment: Thanks for clarifying this. ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } + /** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly + * terminates due to an uncaught exception. Review comment: ```suggestion * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} * throws an unexpected exception. These might be exceptions indicating rare bugs in Kafka Streams, or they * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor * logic. * <p> * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any * thread that encounters such an exception. ``` I think it's wrong to say that this is invoked when the thread abruptly terminates, because it's not. That's how the JVM handler works, but we're actually executing this handler while the thread is still running, and in fact that thread itself is what calls the handler. It also seemed appropriate to elaborate a little more on the usage of this method. ########## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ########## @@ -616,7 +616,7 @@ public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState( final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); try { - streams.setUncaughtExceptionHandler(null); + streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null); Review comment: Should we also test the behavior of setting the new handler to `null`? And actually, why is `null` allowed now, but it wasn't before? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java ########## @@ -54,6 +54,8 @@ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) { log.error("Received error code {}", assignmentErrorCode.get()); throw new MissingSourceTopicException("One or more source topics were missing during rebalance"); + } else if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) { + streamThread.shutdown(); //TODO: 663 should set client to error if all streams are dead Review comment: didn't quite follow this TODO ########## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { + /** + * Inspect a record and the exception received. + * @param exception the actual exception + */ + StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Exception exception); + + /** + * Enumeration that describes the response from the exception handler. + */ + enum StreamsUncaughtExceptionHandlerResponse { + + + SHUTDOWN_STREAM_THREAD(0, "SHUTDOWN_STREAM_THREAD"), + REPLACE_STREAM_THREAD(1, "REPLACE_STREAM_THREAD"), Review comment: +1 I think we should at least comment it out until that other PR is actually in the codebase. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ########## @@ -82,11 +83,21 @@ public SubscriptionInfo(final int version, final UUID processId, final String userEndPoint, final Map<TaskId, Long> taskOffsetSums) { + this(version, latestSupportedVersion, processId, userEndPoint, taskOffsetSums, new AtomicInteger(0)); + } + + public SubscriptionInfo(final int version, + final int latestSupportedVersion, + final UUID processId, + final String userEndPoint, + final Map<TaskId, Long> taskOffsetSums, + final AtomicInteger shutdownRequested) { Review comment: Cool, so maybe this field should be called something else, like "subscription flag"? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AppShutdownIntegrationTest.java ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + + +@Category(IntegrationTest.class) +public class AppShutdownIntegrationTest { + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Rule + public TestName testName = new TestName(); + + @Test + public void shouldSendShutDownSignal() throws Exception { + // + // + // Also note that this is an integration test because so many components have to come together to + // ensure these configurations wind up where they belong, and any number of future code changes + // could break this change. + + final String testId = safeUniqueTestName(getClass(), testName); + final String appId = "appId_" + testId; + final String inputTopic = "input" + testId; + + IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); + + final StreamsBuilder builder = new StreamsBuilder(); + + final List<KeyValue<Object, Object>> processorValueCollector = new ArrayList<>(); + + builder.stream(inputTopic).process(() -> new ShutdownProcessor(processorValueCollector), Named.as("process")); + + final Properties properties = mkObjectProperties( + mkMap( + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), + mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"), + mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"), + mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"), + mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000") + ) + ); + + try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { + final CountDownLatch latch = new CountDownLatch(1); + kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_KAFKA_STREAMS_APPLICATION); Review comment: Feel free to import `SHUTDOWN_KAFKA_STREAMS_APPLICATION` and save like 50 characters on this line ;) ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -529,8 +541,7 @@ public void run() { } } - log.error("Encountered the following exception during processing " + - "and the thread is going to shut down: ", e); + handleStreamsUncaughtException(e); throw e; Review comment: I'm not sure this is right, actually. If we wanted to use the current thread to send the "poison pill" subscription, it needs to keep running and call poll again. Maybe instead we should have a default implementation of the uncaughtExceptionHandler that invokes the legacy one and then returns `SHUTDOWN_STREAM_THREAD`, and then the implementation of that case in KafkaStreams would be to re-throw the exception. ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } + /** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly + * terminates due to an uncaught exception. + * + * @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ + public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) { + final StreamsUncaughtExceptionHandler handler = exception -> handleStreamsUncaughtException(exception, eh); + synchronized (stateLock) { + if (state == State.CREATED) { + for (final StreamThread thread : threads) { + if (eh != null) { + thread.setStreamsUncaughtExceptionHandler(handler); + } else { + final StreamsUncaughtExceptionHandler defaultHandler = exception -> + StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD; + thread.setStreamsUncaughtExceptionHandler(defaultHandler); + } + } + } else { + throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + + "Current state is: " + state); + } + } + } + + private StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handleStreamsUncaughtException(final Exception e, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { + final StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action = streamsUncaughtExceptionHandler.handle(e); + switch (action) { + case SHUTDOWN_STREAM_THREAD: + log.error("Encountered the following exception during processing " + + "and the thread is going to shut down: ", e); + break; + case REPLACE_STREAM_THREAD: + log.error("Encountered the following exception during processing " + + "and the the stream thread will be replaced: ", e); //TODO: add then remove, wait until 663 is merged Review comment: Let's just comment out this whole enum value until 663 is implemented, that way we won't have a TODO hanging around in case 663 doesn't make the release for some reason. ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } + /** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly + * terminates due to an uncaught exception. + * + * @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ + public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) { + final StreamsUncaughtExceptionHandler handler = exception -> handleStreamsUncaughtException(exception, eh); + synchronized (stateLock) { + if (state == State.CREATED) { + for (final StreamThread thread : threads) { + if (eh != null) { + thread.setStreamsUncaughtExceptionHandler(handler); + } else { + final StreamsUncaughtExceptionHandler defaultHandler = exception -> + StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD; + thread.setStreamsUncaughtExceptionHandler(defaultHandler); + } + } + } else { + throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + + "Current state is: " + state); + } + } + } + + private StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handleStreamsUncaughtException(final Exception e, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { + final StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action = streamsUncaughtExceptionHandler.handle(e); + switch (action) { + case SHUTDOWN_STREAM_THREAD: + log.error("Encountered the following exception during processing " + + "and the thread is going to shut down: ", e); + break; + case REPLACE_STREAM_THREAD: + log.error("Encountered the following exception during processing " + + "and the the stream thread will be replaced: ", e); //TODO: add then remove, wait until 663 is merged + break; + case SHUTDOWN_KAFKA_STREAMS_CLIENT: + log.error("Encountered the following exception during processing " + + "and the client is going to shut down: ", e); + for (final StreamThread streamThread: threads) { + streamThread.shutdown(); + } Review comment: Oh, I forgot; the reason you're doing it this way is to transition to ERROR, not actually shut down, right? In that case, it seems pretty odd to call this option "shut down", since it doesn't actually _shut down_, it only kills all the threads, leaving the final "shut down" as an exercise to the user. If I recall correctly, the preference of the group was in favor of this behavior, in which case, I'd advocate for a different name. Maybe just `STOP_STREAM_THREAD`, `STOP_ALL_STREAM_THREADS`, and `STOP_ALL_STREAM_THREADS_IN_CLUSTER`. I've been on the fence about whether I should leave this feedback or not, but decided to go ahead and pass it on to you because I just got confused by the names, despite having recently participating in that discussion. So it seems likely that users would also be confused and think that we're doing the wrong thing by not actually shutting down the client. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -550,6 +561,10 @@ void runLoop() { // until the rebalance is completed before we close and commit the tasks while (isRunning() || taskManager.isRebalanceInProgress()) { try { + if (shutdownRequested.get()) { + sendShutdownRequest(shutdownTypeRequested); + return; Review comment: I can't tell; are we guaranteed to actually send the joinGroup request by now? Maybe it's safer to just keep running this loop until we get back the "you should shut down" response assignment. We _could/should_ add a condition into `runOnce` so that we don't actually process anything once we have `shutdownRequested` set. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org