[ https://issues.apache.org/jira/browse/KAFKA-8882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16927791#comment-16927791 ]
John Roesler commented on KAFKA-8882: ------------------------------------- Hey [~cjub], Thanks for the report. This does indeed not sound right to me. Can you check the logs and see what is reported for the state transitions? I.e., from this log: `log.info("State transition from {} to {}", oldState, newState)`. Also, can you check the logs to see if you fall into this case in close(): {noformat} if (!setState(State.PENDING_SHUTDOWN)) { // if transition failed, it means it was either in PENDING_SHUTDOWN // or NOT_RUNNING already; just check that all threads have been stopped log.info("Already in the pending shutdown state, wait to complete shutdown"); {noformat} One side note, I would clean up _after_ it's not running instead of just before close. Note: you could actually just wait until close() returns, since it should never return until Streams is completely stopped: {noformat} # this is the end of the close() method. if (waitOnState(State.NOT_RUNNING, timeoutMs)) { log.info("Streams client stopped completely"); return true; } else { log.info("Streams client cannot stop completely within the timeout"); return false; } {noformat} Regards, -John > It's not possible to restart Kafka Streams using StateListener > -------------------------------------------------------------- > > Key: KAFKA-8882 > URL: https://issues.apache.org/jira/browse/KAFKA-8882 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.2.1 > Environment: Linux, Windows > Reporter: Jakub > Priority: Major > > Upon problems with connecting to a Kafka Cluster services using Kafka Streams > stop working with the following error message: > {code:java} > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors (...) > Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to > timeout > (...) > State transition from PENDING_SHUTDOWN to DEAD > (...) > All stream threads have died. The instance will be in error state and should > be closed. > {code} > > We tried to use a StateListener to automatically detect and work around this > problem. > However, this approach doesn't seem to work correctly: > # KafkaStreams correctly transitions from status Error to Pending Shutdown, > but then it stays in this status forever. > # Occasionally, after registering a listener the status doesn't even change > to Error. > > {code:java} > kafkaStreams.setStateListener(new StateListener() { > public void onChange(State stateNew, State stateOld) { > if (stateNew == State.ERROR) { > kafkaStreams.cleanUp(); > kafkaStreams.close(); > > } else if (stateNew == State.PENDING_SHUTDOWN) { > > // this message is displayed, and then nothig else > happens > LOGGER.info("State is PENDING_SHUTDOWN"); > > } else if (stateNew == State.NOT_RUNNING) { > // it never gets here > kafkaStreams = createKafkaStreams(); > kafkaStreams.start(); > } > } > }); > {code} > > Surprisingly, restarting KafkaStreams outside of a listener works fine. > I'm happy to provide more details if required. -- This message was sent by Atlassian Jira (v8.3.2#803003)