lucasbru commented on code in PR #18765: URL: https://github.com/apache/kafka/pull/18765#discussion_r1939279250
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout, final boolean leaveGroup) { timeoutMs = Long.MAX_VALUE; } - if (state.hasCompletedShutdown()) { - log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); - return true; - } - if (state.isShuttingDown()) { - log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); - if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { - log.info("Streams client stopped to ERROR completely"); - return true; - } else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { - log.info("Streams client stopped to NOT_RUNNING completely"); + if (!setState(State.PENDING_SHUTDOWN)) { + + if (state.isShuttingDown()) { Review Comment: I tried to preserving the existing log messages. But yeah, it also came to my mind that this could be simplified. I made a pass and cleaned up the cases. ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout, final boolean leaveGroup) { timeoutMs = Long.MAX_VALUE; } - if (state.hasCompletedShutdown()) { - log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); - return true; - } - if (state.isShuttingDown()) { - log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); - if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { - log.info("Streams client stopped to ERROR completely"); - return true; - } else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { - log.info("Streams client stopped to NOT_RUNNING completely"); + if (!setState(State.PENDING_SHUTDOWN)) { + + if (state.isShuttingDown()) { + log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); + if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { Review Comment: You are right, there is another race condition here, although I guess the effect is not as bad as the actual bug that I'm fixing here. We are logging that we ran into timeout when we didn't. Anyway, good find and I will piggy-back a fix for that race condition in this PR. ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout, final boolean leaveGroup) { timeoutMs = Long.MAX_VALUE; } - if (state.hasCompletedShutdown()) { - log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); - return true; - } - if (state.isShuttingDown()) { - log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); - if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { - log.info("Streams client stopped to ERROR completely"); - return true; - } else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { - log.info("Streams client stopped to NOT_RUNNING completely"); + if (!setState(State.PENDING_SHUTDOWN)) { + + if (state.isShuttingDown()) { + log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); + if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { + log.info("Streams client stopped to ERROR completely"); + return true; + } else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { + log.info("Streams client stopped to NOT_RUNNING completely"); + return true; + } else { + log.warn("Streams client cannot transition to {} completely within the timeout", + state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING : State.ERROR); + return false; + } + } + + if (state.hasCompletedShutdown()) { + log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); return true; - } else { - log.warn("Streams client cannot transition to {} completely within the timeout", - state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING : State.ERROR); - return false; } - } - if (!setState(State.PENDING_SHUTDOWN)) { // if we can't transition to PENDING_SHUTDOWN but not because we're already shutting down, then it must be fatal Review Comment: This should indeed be unreachable in the current implementation. In the fixed code, this exception is just a safeguard in case somebody changes the state transitions, which can break the invariants in this code. We don't want to silently ignore it. I'll convert it into an `IllegalStateException`, to make clear this is not expected. ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout, final boolean leaveGroup) { timeoutMs = Long.MAX_VALUE; } - if (state.hasCompletedShutdown()) { Review Comment: We could keep it, but it would just duplicate cases. I think having fewer branches is better. ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout, final boolean leaveGroup) { timeoutMs = Long.MAX_VALUE; } - if (state.hasCompletedShutdown()) { - log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); - return true; - } - if (state.isShuttingDown()) { - log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); - if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { - log.info("Streams client stopped to ERROR completely"); - return true; - } else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { - log.info("Streams client stopped to NOT_RUNNING completely"); + if (!setState(State.PENDING_SHUTDOWN)) { + + if (state.isShuttingDown()) { + log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); + if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { + log.info("Streams client stopped to ERROR completely"); + return true; + } else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { + log.info("Streams client stopped to NOT_RUNNING completely"); + return true; + } else { + log.warn("Streams client cannot transition to {} completely within the timeout", + state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING : State.ERROR); + return false; + } + } + + if (state.hasCompletedShutdown()) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org