This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9dc332f5ca KAFKA-13217: Reconsider skipping the LeaveGroup on close() 
or add an overload that does so (#12035)
9dc332f5ca is described below

commit 9dc332f5ca34b80af369646f767c40c6b189f831
Author: Sayantanu Dey <41713730+sayantanu-...@users.noreply.github.com>
AuthorDate: Mon May 23 22:37:19 2022 +0530

    KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an 
overload that does so (#12035)
    
    This is for KIP-812:
    
    * added leaveGroup on a new close function in kafka stream
    * added logic to resolve future returned by remove member call in close 
method
    * added max check on remainingTime value in close function
    
    
    Reviewers: David Jacot <david.ja...@gmail.com>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  73 ++++++++
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 203 +++++++++++++++++++++
 2 files changed, 276 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 9d9d57f8ab..ed62f89ebc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1349,6 +1349,24 @@ public class KafkaStreams implements AutoCloseable {
         }
     }
 
+    /**
+     * Class that handles options passed in case of {@code KafkaStreams} 
instance scale down
+     */
+    public static class CloseOptions {
+        private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
+        private boolean leaveGroup = false;
+
+        public CloseOptions timeout(final Duration timeout) {
+            this.timeout = timeout;
+            return this;
+        }
+
+        public CloseOptions leaveGroup(final boolean leaveGroup) {
+            this.leaveGroup = leaveGroup;
+            return this;
+        }
+    }
+
     /**
      * Shutdown this {@code KafkaStreams} instance by signaling all the 
threads to stop, and then wait for them to join.
      * This will block until all threads have stopped.
@@ -1498,6 +1516,61 @@ public class KafkaStreams implements AutoCloseable {
         return close(timeoutMs);
     }
 
+    /**
+     * Shutdown this {@code KafkaStreams} by signaling all the threads to 
stop, and then wait up to the timeout for the
+     * threads to join.
+     * @param options  contains timeout to specify how long to wait for the 
threads to shutdown, and a flag leaveGroup to
+     *                 trigger consumer leave call
+     * @return {@code true} if all threads were successfully 
stopped&mdash;{@code false} if the timeout was reached
+     * before all threads stopped
+     * Note that this method must not be called in the {@link 
StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of 
{@link StateListener}.
+     * @throws IllegalArgumentException if {@code timeout} can't be 
represented as {@code long milliseconds}
+     */
+    public synchronized boolean close(final CloseOptions options) throws 
IllegalArgumentException {
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
+        final long timeoutMs = validateMillisecondDuration(options.timeout, 
msgPrefix);
+        if (timeoutMs < 0) {
+            throw new IllegalArgumentException("Timeout can't be negative.");
+        }
+
+        final long startMs = time.milliseconds();
+
+        final boolean closeStatus = close(timeoutMs);
+
+        final Optional<String> groupInstanceId = clientSupplier
+                
.getConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId))
+                .groupMetadata()
+                .groupInstanceId();
+
+        final long remainingTimeMs = Math.max(0, timeoutMs - 
(time.milliseconds() - startMs));
+
+        if (options.leaveGroup && groupInstanceId.isPresent()) {
+            log.debug("Sending leave group trigger to removing instance from 
consumer group");
+            //removing instance from consumer group
+
+            final MemberToRemove memberToRemove = new 
MemberToRemove(groupInstanceId.get());
+
+            final Collection<MemberToRemove> membersToRemove = 
Collections.singletonList(memberToRemove);
+
+            final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroupResult = adminClient
+                    .removeMembersFromConsumerGroup(
+                        
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
+                        new 
RemoveMembersFromConsumerGroupOptions(membersToRemove)
+                    );
+
+            try {
+                
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs,
 TimeUnit.MILLISECONDS);
+            } catch (final Exception e) {
+                log.error("Could not remove static member {} from consumer 
group {} due to a: {}", groupInstanceId.get(),
+                        
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
+            }
+        }
+
+        log.debug("Stopping Streams client with timeoutMillis = {} ms.", 
timeoutMs);
+
+        return closeStatus;
+    }
+
     /**
      * Do a clean up of the local {@link StateStore} directory ({@link 
StreamsConfig#STATE_DIR_CONFIG}) by deleting all
      * data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG 
application ID}.
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index eabb8edc11..3e37e097cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -20,9 +20,13 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -91,6 +95,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutionException;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
@@ -797,6 +802,72 @@ public class KafkaStreamsTest {
         assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
     }
 
+    @Test
+    public void 
shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupFalse()
 throws InterruptedException, ExecutionException {
+
+        final RemoveMembersFromConsumerGroupResult result = 
EasyMock.mock(RemoveMembersFromConsumerGroupResult.class);
+
+        final KafkaFuture<Void> memberResultFuture = 
EasyMock.mock(KafkaFuture.class);
+
+        final MockAdminClient mockAdminClient = 
EasyMock.partialMockBuilder(MockAdminClient.class)
+                
.addMockedMethod("removeMembersFromConsumerGroup").createMock();
+
+        final MockConsumer<byte[], byte[]> mockConsumer = 
EasyMock.partialMockBuilder(MockConsumer.class)
+                .addMockedMethod("groupMetadata").createMock();
+
+        final ConsumerGroupMetadata consumerGroupMetadata = 
EasyMock.mock(ConsumerGroupMetadata.class);
+
+        final Optional<String> groupInstanceId = 
Optional.of("test-instance-id");
+
+        EasyMock.expect(memberResultFuture.get());
+        
EasyMock.expect(result.memberResult(anyObject())).andStubReturn(memberResultFuture);
+        
EasyMock.expect(consumerGroupMetadata.groupInstanceId()).andReturn(groupInstanceId);
+        
EasyMock.expect(mockAdminClient.removeMembersFromConsumerGroup(anyObject(), 
anyObject())).andStubReturn(result);
+        
EasyMock.expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+
+        final MockClientSupplier mockClientSupplier = 
EasyMock.partialMockBuilder(MockClientSupplier.class)
+                .addMockedMethod("getAdmin")
+                .addMockedMethod("getConsumer")
+                .createMock();
+
+        
EasyMock.expect(mockClientSupplier.getAdmin(anyObject())).andReturn(mockAdminClient);
+        
EasyMock.expect(mockClientSupplier.getConsumer(anyObject())).andReturn(mockConsumer);
+
+        EasyMock.replay(result, consumerGroupMetadata, mockConsumer, 
mockAdminClient, mockClientSupplier);
+
+        final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time);
+        streams.start();
+        waitForCondition(
+                () -> streams.state() == KafkaStreams.State.RUNNING,
+                "Streams never started.");
+
+        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
+        closeOptions.timeout(Duration.ZERO);
+        closeOptions.leaveGroup(true);
+
+        streams.close(closeOptions);
+        assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
+        assertThrows(IllegalStateException.class, streams::cleanUp);
+        assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
+    }
+
+    @Test
+    public void 
shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupTrue()
 throws InterruptedException {
+        final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        waitForCondition(
+                () -> streams.state() == KafkaStreams.State.RUNNING,
+                "Streams never started.");
+
+        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
+        closeOptions.timeout(Duration.ZERO);
+
+        streams.close(closeOptions);
+        assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
+        assertThrows(IllegalStateException.class, streams::cleanUp);
+        assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
+    }
+
     @Test
     public void shouldNotGetAllTasksWhenNotRunning() throws 
InterruptedException {
         try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
@@ -919,6 +990,138 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test
+    public void 
shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupFalseWhenThreadsHaventTerminated()
 {
+        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
+        closeOptions.timeout(Duration.ofMillis(10L));
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier)) {
+            assertFalse(streams.close(closeOptions));
+        }
+    }
+
+    @Test
+    public void 
shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupFalse() {
+        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
+        closeOptions.timeout(Duration.ofMillis(-1L));
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            assertThrows(IllegalArgumentException.class, () -> 
streams.close(closeOptions));
+        }
+    }
+
+    @Test
+    public void 
shouldNotBlockInCloseWithCloseOptionLeaveGroupFalseForZeroDuration() {
+        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
+        closeOptions.timeout(Duration.ZERO);
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier)) {
+            assertFalse(streams.close(closeOptions));
+        }
+    }
+
+    @Test
+    public void 
shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupTrueWhenThreadsHaventTerminated()
 throws ExecutionException, InterruptedException {
+
+        final RemoveMembersFromConsumerGroupResult result = 
EasyMock.mock(RemoveMembersFromConsumerGroupResult.class);
+
+        final KafkaFuture<Void> memberResultFuture = 
EasyMock.mock(KafkaFuture.class);
+
+        final MockAdminClient mockAdminClient = 
EasyMock.partialMockBuilder(MockAdminClient.class)
+                
.addMockedMethod("removeMembersFromConsumerGroup").createMock();
+
+        final MockConsumer<byte[], byte[]> mockConsumer = 
EasyMock.partialMockBuilder(MockConsumer.class)
+                .addMockedMethod("groupMetadata").createMock();
+
+        final ConsumerGroupMetadata consumerGroupMetadata = 
EasyMock.mock(ConsumerGroupMetadata.class);
+
+        final Optional<String> groupInstanceId = 
Optional.of("test-instance-id");
+
+        EasyMock.expect(memberResultFuture.get());
+        
EasyMock.expect(result.memberResult(anyObject())).andStubReturn(memberResultFuture);
+        
EasyMock.expect(consumerGroupMetadata.groupInstanceId()).andReturn(groupInstanceId);
+        
EasyMock.expect(mockAdminClient.removeMembersFromConsumerGroup(anyObject(), 
anyObject())).andStubReturn(result);
+        
EasyMock.expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+
+        final MockClientSupplier mockClientSupplier = 
EasyMock.partialMockBuilder(MockClientSupplier.class)
+                .addMockedMethod("getAdmin")
+                .addMockedMethod("getConsumer")
+                .createMock();
+
+        
EasyMock.expect(mockClientSupplier.getAdmin(anyObject())).andReturn(mockAdminClient);
+        
EasyMock.expect(mockClientSupplier.getConsumer(anyObject())).andReturn(mockConsumer);
+
+        EasyMock.replay(result, consumerGroupMetadata, mockConsumer, 
mockAdminClient, mockClientSupplier);
+
+
+        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
+        closeOptions.timeout(Duration.ofMillis(10L));
+        closeOptions.leaveGroup(true);
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier)) {
+            assertFalse(streams.close(closeOptions));
+        }
+    }
+
+    @Test
+    public void 
shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupTrue() {
+        final RemoveMembersFromConsumerGroupResult result = 
EasyMock.mock(RemoveMembersFromConsumerGroupResult.class);
+
+        final MockAdminClient mockAdminClient = 
EasyMock.partialMockBuilder(MockAdminClient.class)
+                
.addMockedMethod("removeMembersFromConsumerGroup").createMock();
+
+        
EasyMock.expect(mockAdminClient.removeMembersFromConsumerGroup(anyObject(), 
anyObject())).andStubReturn(result);
+
+        final MockClientSupplier mockClientSupplier = 
EasyMock.partialMockBuilder(MockClientSupplier.class)
+                .addMockedMethod("getAdmin").createMock();
+        
EasyMock.expect(mockClientSupplier.getAdmin(anyObject())).andReturn(mockAdminClient);
+
+        EasyMock.replay(result, mockAdminClient, mockClientSupplier);
+
+        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
+        closeOptions.timeout(Duration.ofMillis(-1L));
+        closeOptions.leaveGroup(true);
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time)) {
+            assertThrows(IllegalArgumentException.class, () -> 
streams.close(closeOptions));
+        }
+    }
+
+    @Test
+    public void 
shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() throws 
ExecutionException, InterruptedException {
+        final RemoveMembersFromConsumerGroupResult result = 
EasyMock.mock(RemoveMembersFromConsumerGroupResult.class);
+
+        final KafkaFuture<Void> memberResultFuture = 
EasyMock.mock(KafkaFuture.class);
+
+        final MockAdminClient mockAdminClient = 
EasyMock.partialMockBuilder(MockAdminClient.class)
+                
.addMockedMethod("removeMembersFromConsumerGroup").createMock();
+
+        final MockConsumer<byte[], byte[]> mockConsumer = 
EasyMock.partialMockBuilder(MockConsumer.class)
+                .addMockedMethod("groupMetadata").createMock();
+
+        final ConsumerGroupMetadata consumerGroupMetadata = 
EasyMock.mock(ConsumerGroupMetadata.class);
+
+        final Optional<String> groupInstanceId = 
Optional.of("test-instance-id");
+
+        EasyMock.expect(memberResultFuture.get());
+        
EasyMock.expect(result.memberResult(anyObject())).andStubReturn(memberResultFuture);
+        
EasyMock.expect(consumerGroupMetadata.groupInstanceId()).andReturn(groupInstanceId);
+        
EasyMock.expect(mockAdminClient.removeMembersFromConsumerGroup(anyObject(), 
anyObject())).andStubReturn(result);
+        
EasyMock.expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+
+        final MockClientSupplier mockClientSupplier = 
EasyMock.partialMockBuilder(MockClientSupplier.class)
+                .addMockedMethod("getAdmin")
+                .addMockedMethod("getConsumer")
+                .createMock();
+
+        
EasyMock.expect(mockClientSupplier.getAdmin(anyObject())).andReturn(mockAdminClient);
+        
EasyMock.expect(mockClientSupplier.getConsumer(anyObject())).andReturn(mockConsumer);
+
+        EasyMock.replay(result, consumerGroupMetadata, mockConsumer, 
mockAdminClient, mockClientSupplier);
+
+        final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
+        closeOptions.timeout(Duration.ZERO);
+        closeOptions.leaveGroup(true);
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier)) {
+            assertFalse(streams.close(closeOptions));
+        }
+    }
+
     @Test
     public void 
shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
         PowerMock.mockStatic(Executors.class);

Reply via email to