mjsax commented on code in PR #18765: URL: https://github.com/apache/kafka/pull/18765#discussion_r1940327370
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1543,38 +1547,30 @@ private boolean close(final Optional<Long> timeout, final boolean leaveGroup) { timeoutMs = Long.MAX_VALUE; } - if (state.hasCompletedShutdown()) { - log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); - return true; - } - if (state.isShuttingDown()) { - log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); - if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { - log.info("Streams client stopped to ERROR completely"); - return true; - } else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { - log.info("Streams client stopped to NOT_RUNNING completely"); - return true; + if (!setState(State.PENDING_SHUTDOWN)) { + final State stateCopy = state; + if (stateCopy.isShuttingDown()) { + log.info("Skipping shutdown since Streams client is already in {}, waiting for a terminal state", stateCopy); + if (!waitOnStates(timeoutMs, State.ERROR, State.NOT_RUNNING)) { + log.warn("Streams client did transition to a terminal state (ERROR or NOT_RUNNING) within the {}ms timeout", timeoutMs); + return false; + } + log.info("Streams client stopped completely and transitioned to the terminal {} state", state); + } else if (stateCopy.hasCompletedShutdown()) { + log.info("Skipping shutdown since Streams client is already in the terminal {} state", stateCopy); Review Comment: I think the code is correct, but hard to reason about, if not thinking very carefully.... If `stateCopy.isShuttingDown() == false`, we know we are in a terminal state (`NOT_RUNNING` or `ERROR`) and thus it should hold that `state == stateCopy`. We do start with `if (stateCopy.isShuttingDown())` so using `else if (stateCopy.hasCompletedShutdown())` is consistent and easy to read. However, for the log, we do log `log.info("Streams client stopped completely and transitioned to the terminal {} state", state);` above (what is correct und using `stateCopy` would be incorrect), but it feels inconsistent with this log line, which uses `stateCopy` and we could use `state`. If we use `state` here, it raises the question if we should do `else if (state.hasCompletedShutdown())`, but this make it inconsistent to `if (stateCopy.isShuttingDown())` Do I overthink this? -- My impression is, no matter what we do, something feels inconsistent...? Idea: rewrite to make it easier to read: ``` final State stateCopy = state; if (stateCopy.isShuttingDown()) { log.info("Skipping shutdown since Streams client is already in {}, waiting for a terminal state", stateCopy); if (!waitOnStates(timeoutMs, State.ERROR, State.NOT_RUNNING)) { log.warn("Streams client did transition to a terminal state (ERROR or NOT_RUNNING) within the {}ms timeout", timeoutMs); return false; } log.info("Streams client stopped completely and transitioned to the terminal {} state", state); return true; } if (state.hasCompletedShutdown()) { log.info("Skipping shutdown since Streams client is already in the terminal {} state", state); return true; } throw new IllegalStateException("If transitioning to PENDING_SHUTDOWN fails, the state should be either in " + "PENDING_SHUTDOWN, PENDING_ERROR, ERROR, or NOT_RUNNING"); ``` ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -308,7 +310,8 @@ private boolean waitOnState(final State targetState, final long waitMs) { interrupted = true; } } else { - log.debug("Cannot transit to {} within {}ms", targetState, waitMs); + log.debug("Cannot transit to {} within {}ms", + Arrays.stream(targetStates).map(State::toString).collect(Collectors.joining(" or ")), waitMs); Review Comment: Nit formatting. We usually either have a single line, or pass one parameter per line. Otherwise the code is very hard to read. ``` log.debug( "Cannot transit to {} within {}ms", Arrays.stream(targetStates).map(State::toString).collect(Collectors.joining(" or ")), waitMs ); ``` ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -347,8 +347,9 @@ private boolean setState(final State newState) { } else if (state == State.REBALANCING && newState == State.REBALANCING) { // when the state is already in REBALANCING, it should not transit to REBALANCING again return false; - } else if (state == State.ERROR && (newState == State.PENDING_ERROR || newState == State.ERROR)) { - // when the state is already in ERROR, its transition to PENDING_ERROR or ERROR (due to consecutive close calls) + } else if (state == State.ERROR && (newState == State.PENDING_ERROR || newState == State.ERROR || newState == State.PENDING_SHUTDOWN)) { Review Comment: Ah. Bad code... using `if (state == State.PENDING_ERROR...` twice... Should we unify this, and have a single `if(state ==` for each state instead of multiple... (or use switch)? There also seems to be some inconsistency with regard to "idempotent transitions", ie, `oldState == newState`? For `NOT_RUNNIG`, `ERROR`, `PENDING_ERROR `, `PENDING_SHUTDOWN`, and `REBALANCING` we cover it here (even if hard to decode...), and return `false`, if I read the code correctly. `CREATED -> CREATED` would be invalid and we fail (sounda right to be, as this should never happen). For `RUNNING -> RUNNING` we encode it as "valid transition" and return `true`. Fine. But what is the (semantic) different to `REBALANCING -> REBALANCING` which is not considered valid (we return `false` for it)? Same for `PENDING_ERROR` and `PENDING_SHUTDOWN? I guess `NOT_RUNNIG` and `ERROR` fall into the `CREATED` category and should never happen (so why do we not fail for them? And if we do not want to fail, why return `false` and not just make them "valid transitions"?). I am confused. ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1543,38 +1547,30 @@ private boolean close(final Optional<Long> timeout, final boolean leaveGroup) { timeoutMs = Long.MAX_VALUE; } - if (state.hasCompletedShutdown()) { - log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); - return true; - } - if (state.isShuttingDown()) { - log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); - if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { - log.info("Streams client stopped to ERROR completely"); - return true; - } else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { - log.info("Streams client stopped to NOT_RUNNING completely"); - return true; + if (!setState(State.PENDING_SHUTDOWN)) { + final State stateCopy = state; Review Comment: Might be worth to add a comment why we do this -- otherwise, somebody might remove it during "code cleanup"? In the end, it's not just about logging though I think, but also about `isShuttingDown()` -- using a copy makea the call safe, otherwise, the call would be subject to a race? Should we call it `currentState` or `immutableState` or `stateCopyToCheckIsShuttingDownAtomically` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org