This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ff7cbf264c KAFKA-14076: Fix issues with KafkaStreams.CloseOptions (#12408) ff7cbf264c is described below commit ff7cbf264c0cb79ae56209ac381c51ef588ab97a Author: James Hughes <jhug...@confluent.io> AuthorDate: Thu Jul 21 10:35:29 2022 -0400 KAFKA-14076: Fix issues with KafkaStreams.CloseOptions (#12408) - used static memberId was incorrect - need to remove all threads/members from the group - need to use admit client correctly Add test to verify fixes. Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../org/apache/kafka/streams/KafkaStreams.java | 69 ++++--- .../KafkaStreamsCloseOptionsIntegrationTest.java | 198 +++++++++++++++++++++ 2 files changed, 231 insertions(+), 36 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 05d99dd172..02b576c186 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -75,13 +75,11 @@ import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider; import org.apache.kafka.streams.state.internals.QueryableStoreProvider; import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider; - import org.slf4j.Logger; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.function.BiConsumer; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -102,6 +100,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -1350,10 +1349,10 @@ public class KafkaStreams implements AutoCloseable { * This will block until all threads have stopped. */ public void close() { - close(Long.MAX_VALUE); + close(Long.MAX_VALUE, false); } - private Thread shutdownHelper(final boolean error) { + private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) { stateDirCleaner.shutdownNow(); if (rocksDBMetricsRecordingService != null) { rocksDBMetricsRecordingService.shutdownNow(); @@ -1385,6 +1384,10 @@ public class KafkaStreams implements AutoCloseable { } }); + if (leaveGroup) { + processStreamThread(streamThreadLeaveConsumerGroup(timeoutMs)); + } + log.info("Shutdown {} stream threads complete", numStreamThreads); if (globalStreamThread != null) { @@ -1419,7 +1422,7 @@ public class KafkaStreams implements AutoCloseable { }, clientId + "-CloseThread"); } - private boolean close(final long timeoutMs) { + private boolean close(final long timeoutMs, final boolean leaveGroup) { if (state.hasCompletedShutdown()) { log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); return true; @@ -1444,7 +1447,8 @@ public class KafkaStreams implements AutoCloseable { log.error("Failed to transition to PENDING_SHUTDOWN, current state is {}", state); throw new StreamsException("Failed to shut down while in state " + state); } else { - final Thread shutdownThread = shutdownHelper(false); + + final Thread shutdownThread = shutdownHelper(false, timeoutMs, leaveGroup); shutdownThread.setDaemon(true); shutdownThread.start(); @@ -1463,7 +1467,7 @@ public class KafkaStreams implements AutoCloseable { if (!setState(State.PENDING_ERROR)) { log.info("Skipping shutdown since we are already in " + state()); } else { - final Thread shutdownThread = shutdownHelper(true); + final Thread shutdownThread = shutdownHelper(true, -1, false); shutdownThread.setDaemon(true); shutdownThread.start(); @@ -1491,7 +1495,7 @@ public class KafkaStreams implements AutoCloseable { log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs); - return close(timeoutMs); + return close(timeoutMs, false); } /** @@ -1505,48 +1509,41 @@ public class KafkaStreams implements AutoCloseable { * @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds} */ public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException { + Objects.requireNonNull(options, "options cannot be null"); final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout"); final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix); if (timeoutMs < 0) { throw new IllegalArgumentException("Timeout can't be negative."); } + log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs); + return close(timeoutMs, options.leaveGroup); + } - final long startMs = time.milliseconds(); - - final boolean closeStatus = close(timeoutMs); - - final Optional<String> groupInstanceId = clientSupplier - .getConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)) - .groupMetadata() - .groupInstanceId(); - - final long remainingTimeMs = Math.max(0, timeoutMs - (time.milliseconds() - startMs)); - - if (options.leaveGroup && groupInstanceId.isPresent()) { - log.debug("Sending leave group trigger to removing instance from consumer group"); - //removing instance from consumer group - - final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceId.get()); - - final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove); + private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) { + return thread -> { + final Optional<String> groupInstanceId = thread.getGroupInstanceID(); + if (groupInstanceId.isPresent()) { + log.debug("Sending leave group trigger to removing instance from consumer group: {}.", + groupInstanceId.get()); + final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceId.get()); + final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove); - final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient + final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient .removeMembersFromConsumerGroup( applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), new RemoveMembersFromConsumerGroupOptions(membersToRemove) ); - try { - removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, TimeUnit.MILLISECONDS); - } catch (final Exception e) { - log.error("Could not remove static member {} from consumer group {} due to a: {}", groupInstanceId.get(), + try { + removeMembersFromConsumerGroupResult.memberResult(memberToRemove) + .get(remainingTimeMs, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + log.error("Could not remove static member {} from consumer group {} due to a: {}", + groupInstanceId.get(), applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e); + } } - } - - log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs); - - return closeStatus; + }; } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java new file mode 100644 index 0000000000..8d3cb8e879 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import kafka.server.KafkaConfig; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.CloseOptions; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; + +@Category({IntegrationTest.class}) +public class KafkaStreamsCloseOptionsIntegrationTest { + @Rule + public Timeout globalTimeout = Timeout.seconds(600); + @Rule + public final TestName testName = new TestName(); + private static MockTime mockTime; + + @Rule + public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory()); + + protected static final String INPUT_TOPIC = "inputTopic"; + protected static final String OUTPUT_TOPIC = "outputTopic"; + + protected Properties streamsConfig; + protected static KafkaStreams streams; + protected static Admin adminClient; + protected Properties commonClientConfig; + private Properties producerConfig; + protected Properties resultConsumerConfig; + + public static final EmbeddedKafkaCluster CLUSTER; + + static { + final Properties brokerProps = new Properties(); + brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), Integer.toString(Integer.MAX_VALUE)); + CLUSTER = new EmbeddedKafkaCluster(1, brokerProps); + } + + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + @Before + public void before() throws Exception { + mockTime = CLUSTER.time; + + final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + + commonClientConfig = new Properties(); + commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + + streamsConfig = new Properties(); + streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + // In this test, we set the SESSION_TIMEOUT_MS_CONFIG high in order to show that the call to + // `close(CloseOptions)` can remove the application from the Consumder Groups successfully. + streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); + streamsConfig.putAll(commonClientConfig); + + producerConfig = new Properties(); + producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerConfig.putAll(commonClientConfig); + + resultConsumerConfig = new Properties(); + resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer"); + resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + resultConsumerConfig.putAll(commonClientConfig); + + if (adminClient == null) { + adminClient = Admin.create(commonClientConfig); + } + + CLUSTER.deleteAllTopicsAndWait(120_000L); + CLUSTER.createTopic(INPUT_TOPIC, 2, 1); + CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1); + + add10InputElements(); + } + + @After + public void after() throws Exception { + if (streams != null) { + streams.close(Duration.ofSeconds(30)); + } + } + + @Test + public void testCloseOptions() throws Exception { + final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance"); + // Test with two threads to show that each of the threads is being called to remove clients from the CG. + streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); + + // RUN + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(30)); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); + + streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30))); + waitForEmptyConsumerGroup(adminClient, appID, 0); + } + + protected Topology setupTopologyWithoutIntermediateUserTopic() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Long, String> input = builder.stream(INPUT_TOPIC); + + input.to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.String())); + return builder.build(); + } + + private void add10InputElements() { + final List<KeyValue<Long, String>> records = Arrays.asList(KeyValue.pair(0L, "aaa"), + KeyValue.pair(1L, "bbb"), + KeyValue.pair(0L, "ccc"), + KeyValue.pair(1L, "ddd"), + KeyValue.pair(0L, "eee"), + KeyValue.pair(1L, "fff"), + KeyValue.pair(0L, "ggg"), + KeyValue.pair(1L, "hhh"), + KeyValue.pair(0L, "iii"), + KeyValue.pair(1L, "jjj")); + + for (final KeyValue<Long, String> record : records) { + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), producerConfig, mockTime.milliseconds()); + } + } +}