frankvicky commented on code in PR #19400: URL: https://github.com/apache/kafka/pull/19400#discussion_r2076789736
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1791,13 +1791,16 @@ private long advanceNowAndComputeLatency() { public void shutdown(final boolean leaveGroup) { log.info("Informed to shut down"); final State oldState = setState(State.PENDING_SHUTDOWN); + if (leaveGroup) { + requestLeaveGroupDuringShutdown(); + } if (oldState == State.CREATED) { // The thread may not have been started. Take responsibility for shutting down - completeShutdown(true, leaveGroup); + completeShutdown(true); } } - private void completeShutdown(final boolean cleanRun, final boolean leaveGroup) { Review Comment: We could leverage the `leaveGroupRequested` and don't need a new argument. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1823,14 +1826,7 @@ private void completeShutdown(final boolean cleanRun, final boolean leaveGroup) log.error("Failed to close changelog reader due to the following error:", e); } try { - if (leaveGroupRequested.get()) { - mainConsumer.unsubscribe(); Review Comment: `Consumer# unsubscribe()` will make consumers leave the group. I believe this is a workaround, and now we could rely onthe new close API ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1454,10 +1450,6 @@ private Thread shutdownHelper(final boolean error, final long timeoutMs, final b } }); - if (leaveGroup) { - processStreamThread(streamThreadLeaveConsumerGroup(timeoutMs)); - } Review Comment: As previous discussion, we should only close the consumer in `StreamThread#completeShutdown` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org