frankvicky commented on code in PR #19400:
URL: https://github.com/apache/kafka/pull/19400#discussion_r2076789736


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1791,13 +1791,16 @@ private long advanceNowAndComputeLatency() {
     public void shutdown(final boolean leaveGroup) {
         log.info("Informed to shut down");
         final State oldState = setState(State.PENDING_SHUTDOWN);
+        if (leaveGroup) {
+            requestLeaveGroupDuringShutdown();
+        }
         if (oldState == State.CREATED) {
             // The thread may not have been started. Take responsibility for 
shutting down
-            completeShutdown(true, leaveGroup);
+            completeShutdown(true);
         }
     }
 
-    private void completeShutdown(final boolean cleanRun, final boolean 
leaveGroup) {

Review Comment:
   We could leverage the `leaveGroupRequested` and don't need a new argument.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1823,14 +1826,7 @@ private void completeShutdown(final boolean cleanRun, 
final boolean leaveGroup)
             log.error("Failed to close changelog reader due to the following 
error:", e);
         }
         try {
-            if (leaveGroupRequested.get()) {
-                mainConsumer.unsubscribe();

Review Comment:
   `Consumer# unsubscribe()` will make consumers leave the group. 
   I believe this is a workaround, and now we could rely onthe  new close API



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1454,10 +1450,6 @@ private Thread shutdownHelper(final boolean error, final 
long timeoutMs, final b
                 }
             });
 
-            if (leaveGroup) {
-                processStreamThread(streamThreadLeaveConsumerGroup(timeoutMs));
-            }

Review Comment:
   As previous discussion, we should only close the consumer in 
`StreamThread#completeShutdown`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to