chia7712 commented on code in PR #19400:
URL: https://github.com/apache/kafka/pull/19400#discussion_r2134045418


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1816,10 +1820,15 @@ private long advanceNowAndComputeLatency() {
      * <p>
      * Note that there is nothing to prevent this function from being called 
multiple times
      * (e.g., in testing), hence the state is set only the first time
+     *
+     * @param leaveGroup this flag will control whether the consumer will 
leave the group on close or not
      */
-    public void shutdown() {
+    public void shutdown(final boolean leaveGroup) {
         log.info("Informed to shut down");
         final State oldState = setState(State.PENDING_SHUTDOWN);
+        if (leaveGroup) {
+            requestLeaveGroupDuringShutdown();

Review Comment:
   `requestLeaveGroupDuringShutdown` is not used by other classes now, so we 
can remove `requestLeaveGroupDuringShutdown` by calling 
`leaveGroupRequested.set(true);`



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1181,9 +1177,7 @@ private Optional<String> removeStreamThread(final long 
timeoutMs) throws Timeout
                     final boolean callingThreadIsNotCurrentStreamThread = 
!streamThread.getName().equals(Thread.currentThread().getName());
                     if (streamThread.isThreadAlive() && 
(callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
                         log.info("Removing StreamThread {}", 
streamThread.getName());
-                        final Optional<String> groupInstanceID = 
streamThread.groupInstanceID();
-                        streamThread.requestLeaveGroupDuringShutdown();
-                        streamThread.shutdown();
+                        streamThread.shutdown(true);
                         if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {

Review Comment:
   As you are updating this file, please replace 
`!streamThread.getName().equals(Thread.currentThread().getName())` with 
`callingThreadIsNotCurrentStreamThread`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1824,12 +1834,13 @@ private void completeShutdown(final boolean cleanRun) {
             log.error("Failed to unsubscribe due to the following error: ", e);
         }
         try {
-            mainConsumer.close();
+            final GroupMembershipOperation membershipOperation = leaveGroup ? 
LEAVE_GROUP : REMAIN_IN_GROUP;
+            
mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation));
         } catch (final Throwable e) {
             log.error("Failed to close consumer due to the following error:", 
e);
         }
         try {
-            restoreConsumer.close();
+            
restoreConsumer.close(CloseOptions.groupMembershipOperation(REMAIN_IN_GROUP));

Review Comment:
   The restore consumer does not use a consumer group, so it should be fine to 
call  `restoreConsumer.close();`, right?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1205,45 +1200,9 @@ private Optional<String> removeStreamThread(final long 
timeoutMs) throws Timeout
                         final long cacheSizePerThread = 
cacheSizePerThread(numLiveStreamThreads());
                         log.info("Resizing thread cache due to thread removal, 
new cache size per thread is {}", cacheSizePerThread);
                         resizeThreadCache(cacheSizePerThread);
-                        if (groupInstanceID.isPresent() && 
callingThreadIsNotCurrentStreamThread) {
-                            final MemberToRemove memberToRemove = new 
MemberToRemove(groupInstanceID.get());
-                            final Collection<MemberToRemove> membersToRemove = 
Collections.singletonList(memberToRemove);
-                            final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroupResult = 
-                                adminClient.removeMembersFromConsumerGroup(
-                                    
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
-                                    new 
RemoveMembersFromConsumerGroupOptions(membersToRemove)
-                                );
-                            try {
-                                final long remainingTimeMs = timeoutMs - 
(time.milliseconds() - startMs);
-                                
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs,
 TimeUnit.MILLISECONDS);
-                            } catch (final 
java.util.concurrent.TimeoutException exception) {
-                                log.error(
-                                    String.format(
-                                        "Could not remove static member %s 
from consumer group %s due to a timeout:",
-                                        groupInstanceID.get(),
-                                        
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
-                                    ),
-                                        exception
-                                );
-                                throw new 
TimeoutException(exception.getMessage(), exception);
-                            } catch (final InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                            } catch (final ExecutionException exception) {
-                                log.error(
-                                    String.format(
-                                        "Could not remove static member %s 
from consumer group %s due to:",
-                                        groupInstanceID.get(),
-                                        
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
-                                    ),
-                                        exception
-                                );
-                                throw new StreamsException(
-                                        "Could not remove static member " + 
groupInstanceID.get()
-                                            + " from consumer group " + 
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
-                                            + " for the following reason: ",
-                                        exception.getCause()
-                                );
-                            }

Review Comment:
   It seems the behavior of `KafkaStreams#removeStreamThread` has changed. 
Without an explicit removal, the consumer is kicked off from the group after 
stream thread completes its `run` function. This means users CANNOT assume 
`KafkaStreams#removeStreamThread` will remove the member from the group. 



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -490,7 +487,7 @@ private void replaceStreamThread(final Throwable throwable) 
{
             closeToError();
         }
         final StreamThread deadThread = (StreamThread) Thread.currentThread();
-        deadThread.shutdown();
+        deadThread.shutdown(true);

Review Comment:
   It seems the origin behavior does not want to leave group, right? otherwise, 
it will trigger the extra rebalance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to