lucasbru commented on code in PR #18765:
URL: https://github.com/apache/kafka/pull/18765#discussion_r1939279250


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout, 
final boolean leaveGroup) {
             timeoutMs = Long.MAX_VALUE;
         }
 
-        if (state.hasCompletedShutdown()) {
-            log.info("Streams client is already in the terminal {} state, all 
resources are closed and the client has stopped.", state);
-            return true;
-        }
-        if (state.isShuttingDown()) {
-            log.info("Streams client is in {}, all resources are being closed 
and the client will be stopped.", state);
-            if (state == State.PENDING_ERROR && waitOnState(State.ERROR, 
timeoutMs)) {
-                log.info("Streams client stopped to ERROR completely");
-                return true;
-            } else if (state == State.PENDING_SHUTDOWN && 
waitOnState(State.NOT_RUNNING, timeoutMs)) {
-                log.info("Streams client stopped to NOT_RUNNING completely");
+        if (!setState(State.PENDING_SHUTDOWN)) {
+
+            if (state.isShuttingDown()) {

Review Comment:
   I tried to preserving the existing log messages. But yeah, it also came to 
my mind that this could be simplified. I made a pass and cleaned up the cases.



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout, 
final boolean leaveGroup) {
             timeoutMs = Long.MAX_VALUE;
         }
 
-        if (state.hasCompletedShutdown()) {
-            log.info("Streams client is already in the terminal {} state, all 
resources are closed and the client has stopped.", state);
-            return true;
-        }
-        if (state.isShuttingDown()) {
-            log.info("Streams client is in {}, all resources are being closed 
and the client will be stopped.", state);
-            if (state == State.PENDING_ERROR && waitOnState(State.ERROR, 
timeoutMs)) {
-                log.info("Streams client stopped to ERROR completely");
-                return true;
-            } else if (state == State.PENDING_SHUTDOWN && 
waitOnState(State.NOT_RUNNING, timeoutMs)) {
-                log.info("Streams client stopped to NOT_RUNNING completely");
+        if (!setState(State.PENDING_SHUTDOWN)) {
+
+            if (state.isShuttingDown()) {
+                log.info("Streams client is in {}, all resources are being 
closed and the client will be stopped.", state);
+                if (state == State.PENDING_ERROR && waitOnState(State.ERROR, 
timeoutMs)) {

Review Comment:
   You are right, there is another race condition here, although I guess the 
effect is not as bad as the actual bug that I'm fixing here. We are logging 
that we ran into timeout when we didn't. Anyway, good find and I will 
piggy-back a fix for that race condition in this PR.



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout, 
final boolean leaveGroup) {
             timeoutMs = Long.MAX_VALUE;
         }
 
-        if (state.hasCompletedShutdown()) {
-            log.info("Streams client is already in the terminal {} state, all 
resources are closed and the client has stopped.", state);
-            return true;
-        }
-        if (state.isShuttingDown()) {
-            log.info("Streams client is in {}, all resources are being closed 
and the client will be stopped.", state);
-            if (state == State.PENDING_ERROR && waitOnState(State.ERROR, 
timeoutMs)) {
-                log.info("Streams client stopped to ERROR completely");
-                return true;
-            } else if (state == State.PENDING_SHUTDOWN && 
waitOnState(State.NOT_RUNNING, timeoutMs)) {
-                log.info("Streams client stopped to NOT_RUNNING completely");
+        if (!setState(State.PENDING_SHUTDOWN)) {
+
+            if (state.isShuttingDown()) {
+                log.info("Streams client is in {}, all resources are being 
closed and the client will be stopped.", state);
+                if (state == State.PENDING_ERROR && waitOnState(State.ERROR, 
timeoutMs)) {
+                    log.info("Streams client stopped to ERROR completely");
+                    return true;
+                } else if (state == State.PENDING_SHUTDOWN && 
waitOnState(State.NOT_RUNNING, timeoutMs)) {
+                    log.info("Streams client stopped to NOT_RUNNING 
completely");
+                    return true;
+                } else {
+                    log.warn("Streams client cannot transition to {} 
completely within the timeout",
+                        state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING : 
State.ERROR);
+                    return false;
+                }
+            }
+
+            if (state.hasCompletedShutdown()) {
+                log.info("Streams client is already in the terminal {} state, 
all resources are closed and the client has stopped.", state);
                 return true;
-            } else {
-                log.warn("Streams client cannot transition to {} completely 
within the timeout",
-                         state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING : 
State.ERROR);
-                return false;
             }
-        }
 
-        if (!setState(State.PENDING_SHUTDOWN)) {
             // if we can't transition to PENDING_SHUTDOWN but not because 
we're already shutting down, then it must be fatal

Review Comment:
   This should indeed be unreachable in the current implementation. In the 
fixed code, this exception is just a safeguard in case somebody changes the 
state transitions, which can break the invariants in this code. We don't want 
to silently ignore it. I'll convert it into an `IllegalStateException`, to make 
clear this is not expected.



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout, 
final boolean leaveGroup) {
             timeoutMs = Long.MAX_VALUE;
         }
 
-        if (state.hasCompletedShutdown()) {

Review Comment:
   We could keep it, but it would just duplicate cases. I think having fewer 
branches is better.



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout, 
final boolean leaveGroup) {
             timeoutMs = Long.MAX_VALUE;
         }
 
-        if (state.hasCompletedShutdown()) {
-            log.info("Streams client is already in the terminal {} state, all 
resources are closed and the client has stopped.", state);
-            return true;
-        }
-        if (state.isShuttingDown()) {
-            log.info("Streams client is in {}, all resources are being closed 
and the client will be stopped.", state);
-            if (state == State.PENDING_ERROR && waitOnState(State.ERROR, 
timeoutMs)) {
-                log.info("Streams client stopped to ERROR completely");
-                return true;
-            } else if (state == State.PENDING_SHUTDOWN && 
waitOnState(State.NOT_RUNNING, timeoutMs)) {
-                log.info("Streams client stopped to NOT_RUNNING completely");
+        if (!setState(State.PENDING_SHUTDOWN)) {
+
+            if (state.isShuttingDown()) {
+                log.info("Streams client is in {}, all resources are being 
closed and the client will be stopped.", state);
+                if (state == State.PENDING_ERROR && waitOnState(State.ERROR, 
timeoutMs)) {
+                    log.info("Streams client stopped to ERROR completely");
+                    return true;
+                } else if (state == State.PENDING_SHUTDOWN && 
waitOnState(State.NOT_RUNNING, timeoutMs)) {
+                    log.info("Streams client stopped to NOT_RUNNING 
completely");
+                    return true;
+                } else {
+                    log.warn("Streams client cannot transition to {} 
completely within the timeout",
+                        state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING : 
State.ERROR);
+                    return false;
+                }
+            }
+
+            if (state.hasCompletedShutdown()) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to