[ 
https://issues.apache.org/jira/browse/KAFKA-8882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16927791#comment-16927791
 ] 

John Roesler commented on KAFKA-8882:
-------------------------------------

Hey [~cjub],

Thanks for the report. This does indeed not sound right to me. Can you check 
the logs and see what is reported for the state transitions? I.e., from this 
log: `log.info("State transition from {} to {}", oldState, newState)`.

Also, can you check the logs to see if you fall into this case in close():

{noformat}
        if (!setState(State.PENDING_SHUTDOWN)) {
            // if transition failed, it means it was either in PENDING_SHUTDOWN
            // or NOT_RUNNING already; just check that all threads have been 
stopped
            log.info("Already in the pending shutdown state, wait to complete 
shutdown");
{noformat}

One side note, I would clean up _after_ it's not running instead of just before 
close.

Note: you could actually just wait until close() returns, since it should never 
return until Streams is completely stopped:

{noformat}
# this is the end of the close() method.
        if (waitOnState(State.NOT_RUNNING, timeoutMs)) {
            log.info("Streams client stopped completely");
            return true;
        } else {
            log.info("Streams client cannot stop completely within the 
timeout");
            return false;
        }
{noformat}

Regards,
-John


> It's not possible to restart Kafka Streams using StateListener
> --------------------------------------------------------------
>
>                 Key: KAFKA-8882
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8882
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.2.1
>         Environment: Linux, Windows
>            Reporter: Jakub
>            Priority: Major
>
> Upon problems with connecting to a Kafka Cluster services using Kafka Streams 
> stop working with the following error message:
> {code:java}
> Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors (...)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to 
> timeout
> (...)
> State transition from PENDING_SHUTDOWN to DEAD
> (...)
> All stream threads have died. The instance will be in error state and should 
> be closed.
> {code}
>  
> We tried to use a StateListener to automatically detect and work around this 
> problem. 
>  However, this approach doesn't seem to work correctly:
>  # KafkaStreams correctly transitions from status Error to Pending Shutdown, 
> but then it stays in this status forever.
>  # Occasionally, after registering a listener the status doesn't even change 
> to Error.
>  
> {code:java}
> kafkaStreams.setStateListener(new StateListener() {
>       public void onChange(State stateNew, State stateOld) {
>               if (stateNew == State.ERROR) {
>                       kafkaStreams.cleanUp();
>                       kafkaStreams.close();
>                       
>               } else if (stateNew == State.PENDING_SHUTDOWN) {                
>         
>                       // this message is displayed, and then nothig else 
> happens
>                       LOGGER.info("State is PENDING_SHUTDOWN");
>                       
>               } else if (stateNew == State.NOT_RUNNING) {
>                       // it never gets here
>                       kafkaStreams = createKafkaStreams();
>                       kafkaStreams.start();
>               }
>       }
> });
> {code}
>  
> Surprisingly, restarting KafkaStreams outside of a listener works fine.
>  I'm happy to provide more details if required.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to