This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9dc332f5ca KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so (#12035) 9dc332f5ca is described below commit 9dc332f5ca34b80af369646f767c40c6b189f831 Author: Sayantanu Dey <41713730+sayantanu-...@users.noreply.github.com> AuthorDate: Mon May 23 22:37:19 2022 +0530 KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so (#12035) This is for KIP-812: * added leaveGroup on a new close function in kafka stream * added logic to resolve future returned by remove member call in close method * added max check on remainingTime value in close function Reviewers: David Jacot <david.ja...@gmail.com>, Guozhang Wang <wangg...@gmail.com> --- .../org/apache/kafka/streams/KafkaStreams.java | 73 ++++++++ .../org/apache/kafka/streams/KafkaStreamsTest.java | 203 +++++++++++++++++++++ 2 files changed, 276 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 9d9d57f8ab..ed62f89ebc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1349,6 +1349,24 @@ public class KafkaStreams implements AutoCloseable { } } + /** + * Class that handles options passed in case of {@code KafkaStreams} instance scale down + */ + public static class CloseOptions { + private Duration timeout = Duration.ofMillis(Long.MAX_VALUE); + private boolean leaveGroup = false; + + public CloseOptions timeout(final Duration timeout) { + this.timeout = timeout; + return this; + } + + public CloseOptions leaveGroup(final boolean leaveGroup) { + this.leaveGroup = leaveGroup; + return this; + } + } + /** * Shutdown this {@code KafkaStreams} instance by signaling all the threads to stop, and then wait for them to join. * This will block until all threads have stopped. @@ -1498,6 +1516,61 @@ public class KafkaStreams implements AutoCloseable { return close(timeoutMs); } + /** + * Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the + * threads to join. + * @param options contains timeout to specify how long to wait for the threads to shutdown, and a flag leaveGroup to + * trigger consumer leave call + * @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached + * before all threads stopped + * Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}. + * @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds} + */ + public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException { + final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout"); + final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix); + if (timeoutMs < 0) { + throw new IllegalArgumentException("Timeout can't be negative."); + } + + final long startMs = time.milliseconds(); + + final boolean closeStatus = close(timeoutMs); + + final Optional<String> groupInstanceId = clientSupplier + .getConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)) + .groupMetadata() + .groupInstanceId(); + + final long remainingTimeMs = Math.max(0, timeoutMs - (time.milliseconds() - startMs)); + + if (options.leaveGroup && groupInstanceId.isPresent()) { + log.debug("Sending leave group trigger to removing instance from consumer group"); + //removing instance from consumer group + + final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceId.get()); + + final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove); + + final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient + .removeMembersFromConsumerGroup( + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), + new RemoveMembersFromConsumerGroupOptions(membersToRemove) + ); + + try { + removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + log.error("Could not remove static member {} from consumer group {} due to a: {}", groupInstanceId.get(), + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e); + } + } + + log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs); + + return closeStatus; + } + /** * Do a clean up of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all * data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}. diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index eabb8edc11..3e37e097cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -20,9 +20,13 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.MetricConfig; @@ -91,6 +95,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutionException; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -797,6 +802,72 @@ public class KafkaStreamsTest { assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); } + @Test + public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupFalse() throws InterruptedException, ExecutionException { + + final RemoveMembersFromConsumerGroupResult result = EasyMock.mock(RemoveMembersFromConsumerGroupResult.class); + + final KafkaFuture<Void> memberResultFuture = EasyMock.mock(KafkaFuture.class); + + final MockAdminClient mockAdminClient = EasyMock.partialMockBuilder(MockAdminClient.class) + .addMockedMethod("removeMembersFromConsumerGroup").createMock(); + + final MockConsumer<byte[], byte[]> mockConsumer = EasyMock.partialMockBuilder(MockConsumer.class) + .addMockedMethod("groupMetadata").createMock(); + + final ConsumerGroupMetadata consumerGroupMetadata = EasyMock.mock(ConsumerGroupMetadata.class); + + final Optional<String> groupInstanceId = Optional.of("test-instance-id"); + + EasyMock.expect(memberResultFuture.get()); + EasyMock.expect(result.memberResult(anyObject())).andStubReturn(memberResultFuture); + EasyMock.expect(consumerGroupMetadata.groupInstanceId()).andReturn(groupInstanceId); + EasyMock.expect(mockAdminClient.removeMembersFromConsumerGroup(anyObject(), anyObject())).andStubReturn(result); + EasyMock.expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata); + + final MockClientSupplier mockClientSupplier = EasyMock.partialMockBuilder(MockClientSupplier.class) + .addMockedMethod("getAdmin") + .addMockedMethod("getConsumer") + .createMock(); + + EasyMock.expect(mockClientSupplier.getAdmin(anyObject())).andReturn(mockAdminClient); + EasyMock.expect(mockClientSupplier.getConsumer(anyObject())).andReturn(mockConsumer); + + EasyMock.replay(result, consumerGroupMetadata, mockConsumer, mockAdminClient, mockClientSupplier); + + final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time); + streams.start(); + waitForCondition( + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); + + final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); + closeOptions.timeout(Duration.ZERO); + closeOptions.leaveGroup(true); + + streams.close(closeOptions); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + assertThrows(IllegalStateException.class, streams::cleanUp); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + } + + @Test + public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupTrue() throws InterruptedException { + final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); + streams.start(); + waitForCondition( + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); + + final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); + closeOptions.timeout(Duration.ZERO); + + streams.close(closeOptions); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + assertThrows(IllegalStateException.class, streams::cleanUp); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + } + @Test public void shouldNotGetAllTasksWhenNotRunning() throws InterruptedException { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { @@ -919,6 +990,138 @@ public class KafkaStreamsTest { } } + @Test + public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupFalseWhenThreadsHaventTerminated() { + final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); + closeOptions.timeout(Duration.ofMillis(10L)); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier)) { + assertFalse(streams.close(closeOptions)); + } + } + + @Test + public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupFalse() { + final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); + closeOptions.timeout(Duration.ofMillis(-1L)); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions)); + } + } + + @Test + public void shouldNotBlockInCloseWithCloseOptionLeaveGroupFalseForZeroDuration() { + final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); + closeOptions.timeout(Duration.ZERO); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier)) { + assertFalse(streams.close(closeOptions)); + } + } + + @Test + public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupTrueWhenThreadsHaventTerminated() throws ExecutionException, InterruptedException { + + final RemoveMembersFromConsumerGroupResult result = EasyMock.mock(RemoveMembersFromConsumerGroupResult.class); + + final KafkaFuture<Void> memberResultFuture = EasyMock.mock(KafkaFuture.class); + + final MockAdminClient mockAdminClient = EasyMock.partialMockBuilder(MockAdminClient.class) + .addMockedMethod("removeMembersFromConsumerGroup").createMock(); + + final MockConsumer<byte[], byte[]> mockConsumer = EasyMock.partialMockBuilder(MockConsumer.class) + .addMockedMethod("groupMetadata").createMock(); + + final ConsumerGroupMetadata consumerGroupMetadata = EasyMock.mock(ConsumerGroupMetadata.class); + + final Optional<String> groupInstanceId = Optional.of("test-instance-id"); + + EasyMock.expect(memberResultFuture.get()); + EasyMock.expect(result.memberResult(anyObject())).andStubReturn(memberResultFuture); + EasyMock.expect(consumerGroupMetadata.groupInstanceId()).andReturn(groupInstanceId); + EasyMock.expect(mockAdminClient.removeMembersFromConsumerGroup(anyObject(), anyObject())).andStubReturn(result); + EasyMock.expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata); + + final MockClientSupplier mockClientSupplier = EasyMock.partialMockBuilder(MockClientSupplier.class) + .addMockedMethod("getAdmin") + .addMockedMethod("getConsumer") + .createMock(); + + EasyMock.expect(mockClientSupplier.getAdmin(anyObject())).andReturn(mockAdminClient); + EasyMock.expect(mockClientSupplier.getConsumer(anyObject())).andReturn(mockConsumer); + + EasyMock.replay(result, consumerGroupMetadata, mockConsumer, mockAdminClient, mockClientSupplier); + + + final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); + closeOptions.timeout(Duration.ofMillis(10L)); + closeOptions.leaveGroup(true); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier)) { + assertFalse(streams.close(closeOptions)); + } + } + + @Test + public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupTrue() { + final RemoveMembersFromConsumerGroupResult result = EasyMock.mock(RemoveMembersFromConsumerGroupResult.class); + + final MockAdminClient mockAdminClient = EasyMock.partialMockBuilder(MockAdminClient.class) + .addMockedMethod("removeMembersFromConsumerGroup").createMock(); + + EasyMock.expect(mockAdminClient.removeMembersFromConsumerGroup(anyObject(), anyObject())).andStubReturn(result); + + final MockClientSupplier mockClientSupplier = EasyMock.partialMockBuilder(MockClientSupplier.class) + .addMockedMethod("getAdmin").createMock(); + EasyMock.expect(mockClientSupplier.getAdmin(anyObject())).andReturn(mockAdminClient); + + EasyMock.replay(result, mockAdminClient, mockClientSupplier); + + final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); + closeOptions.timeout(Duration.ofMillis(-1L)); + closeOptions.leaveGroup(true); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time)) { + assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions)); + } + } + + @Test + public void shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() throws ExecutionException, InterruptedException { + final RemoveMembersFromConsumerGroupResult result = EasyMock.mock(RemoveMembersFromConsumerGroupResult.class); + + final KafkaFuture<Void> memberResultFuture = EasyMock.mock(KafkaFuture.class); + + final MockAdminClient mockAdminClient = EasyMock.partialMockBuilder(MockAdminClient.class) + .addMockedMethod("removeMembersFromConsumerGroup").createMock(); + + final MockConsumer<byte[], byte[]> mockConsumer = EasyMock.partialMockBuilder(MockConsumer.class) + .addMockedMethod("groupMetadata").createMock(); + + final ConsumerGroupMetadata consumerGroupMetadata = EasyMock.mock(ConsumerGroupMetadata.class); + + final Optional<String> groupInstanceId = Optional.of("test-instance-id"); + + EasyMock.expect(memberResultFuture.get()); + EasyMock.expect(result.memberResult(anyObject())).andStubReturn(memberResultFuture); + EasyMock.expect(consumerGroupMetadata.groupInstanceId()).andReturn(groupInstanceId); + EasyMock.expect(mockAdminClient.removeMembersFromConsumerGroup(anyObject(), anyObject())).andStubReturn(result); + EasyMock.expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata); + + final MockClientSupplier mockClientSupplier = EasyMock.partialMockBuilder(MockClientSupplier.class) + .addMockedMethod("getAdmin") + .addMockedMethod("getConsumer") + .createMock(); + + EasyMock.expect(mockClientSupplier.getAdmin(anyObject())).andReturn(mockAdminClient); + EasyMock.expect(mockClientSupplier.getConsumer(anyObject())).andReturn(mockConsumer); + + EasyMock.replay(result, consumerGroupMetadata, mockConsumer, mockAdminClient, mockClientSupplier); + + final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); + closeOptions.timeout(Duration.ZERO); + closeOptions.leaveGroup(true); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier)) { + assertFalse(streams.close(closeOptions)); + } + } + @Test public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() { PowerMock.mockStatic(Executors.class);