chia7712 commented on code in PR #19400: URL: https://github.com/apache/kafka/pull/19400#discussion_r2134045418
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1816,10 +1820,15 @@ private long advanceNowAndComputeLatency() { * <p> * Note that there is nothing to prevent this function from being called multiple times * (e.g., in testing), hence the state is set only the first time + * + * @param leaveGroup this flag will control whether the consumer will leave the group on close or not */ - public void shutdown() { + public void shutdown(final boolean leaveGroup) { log.info("Informed to shut down"); final State oldState = setState(State.PENDING_SHUTDOWN); + if (leaveGroup) { + requestLeaveGroupDuringShutdown(); Review Comment: `requestLeaveGroupDuringShutdown` is not used by other classes now, so we can remove `requestLeaveGroupDuringShutdown` by calling `leaveGroupRequested.set(true);` ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1181,9 +1177,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName()); if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) { log.info("Removing StreamThread {}", streamThread.getName()); - final Optional<String> groupInstanceID = streamThread.groupInstanceID(); - streamThread.requestLeaveGroupDuringShutdown(); - streamThread.shutdown(); + streamThread.shutdown(true); if (!streamThread.getName().equals(Thread.currentThread().getName())) { Review Comment: As you are updating this file, please replace `!streamThread.getName().equals(Thread.currentThread().getName())` with `callingThreadIsNotCurrentStreamThread`. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1824,12 +1834,13 @@ private void completeShutdown(final boolean cleanRun) { log.error("Failed to unsubscribe due to the following error: ", e); } try { - mainConsumer.close(); + final GroupMembershipOperation membershipOperation = leaveGroup ? LEAVE_GROUP : REMAIN_IN_GROUP; + mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation)); } catch (final Throwable e) { log.error("Failed to close consumer due to the following error:", e); } try { - restoreConsumer.close(); + restoreConsumer.close(CloseOptions.groupMembershipOperation(REMAIN_IN_GROUP)); Review Comment: The restore consumer does not use a consumer group, so it should be fine to call `restoreConsumer.close();`, right? ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1205,45 +1200,9 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads()); log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread); resizeThreadCache(cacheSizePerThread); - if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) { - final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get()); - final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove); - final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = - adminClient.removeMembersFromConsumerGroup( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), - new RemoveMembersFromConsumerGroupOptions(membersToRemove) - ); - try { - final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs); - removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, TimeUnit.MILLISECONDS); - } catch (final java.util.concurrent.TimeoutException exception) { - log.error( - String.format( - "Could not remove static member %s from consumer group %s due to a timeout:", - groupInstanceID.get(), - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG) - ), - exception - ); - throw new TimeoutException(exception.getMessage(), exception); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (final ExecutionException exception) { - log.error( - String.format( - "Could not remove static member %s from consumer group %s due to:", - groupInstanceID.get(), - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG) - ), - exception - ); - throw new StreamsException( - "Could not remove static member " + groupInstanceID.get() - + " from consumer group " + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG) - + " for the following reason: ", - exception.getCause() - ); - } Review Comment: It seems the behavior of `KafkaStreams#removeStreamThread` has changed. Without an explicit removal, the consumer is kicked off from the group after stream thread completes its `run` function. This means users CANNOT assume `KafkaStreams#removeStreamThread` will remove the member from the group. ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -490,7 +487,7 @@ private void replaceStreamThread(final Throwable throwable) { closeToError(); } final StreamThread deadThread = (StreamThread) Thread.currentThread(); - deadThread.shutdown(); + deadThread.shutdown(true); Review Comment: It seems the origin behavior does not want to leave group, right? otherwise, it will trigger the extra rebalance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org