[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544335#comment-16544335
 ] 

Guozhang Wang commented on KAFKA-6520:
--------------------------------------

Here is my current analysis on this issue:

When {{KafkaConsumer.poll}} is called, in which 
{{updateAssignmentMetadataIfNeeded}} is called and {{coordinator.poll()}} is 
called. If the broker(s) are not available then the last call will always 
return false after timeout, and hence {{KafkaConsumer.poll}} will return empty 
set without any indicator that brokers are disconnect. In fact, if the brokers 
are unavailable at the very beginning when streams application starts, we will 
hit the same issue as the instance will also be in the {{RUNNING}} state since 
the onPartitionRevoked would not be called ever.

So I think if we want to improve on this situation it should be on the consumer 
client, not the streams client. I.e. today if you use a KafkaConsumer to fetch 
data directly, and the brokers become unavailable, you will observe the same 
scenario. On the other hand, it is by-design of the consumer to not let users 
worry about the connectivity of the broker, or to handle any disconnect issues. 
Here is my proposed options:

1. Expose metrics from Consumer#NetworkClient on the 
{{ClusterConnectionStates}}. And then let users to watch on this metrics to 
watch out for dis-connectivity issues.
2. We can add a new State, say named {{IDLE}}, to StreamThread, that if 
{{consumer.poll}} returns nothing, and in the same iteration, restore consumer 
also returns nothing, transit to {{IDLE}}. So only {{RUNNING}} can be transited 
to {{IDLE}} and {{IDLE}} can only be transited to {{RUNNING}} (if the next poll 
call returns some data) or {{PARTITION_REVOKED}} (if the next poll call 
triggers rebalance, and the callback is triggered). For global thread it is 
simpler, that if one poll call returns nothing transit to {{IDLE}}. And also 
the KafkaStreams can add a new {{IDLE}} as well, which will be transited from 
{{RUNNING}} if all its thread (including global thread) transit to {{IDLE}}.

Thoughts?

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-6520
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6520
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Michael Kohout
>            Assignee: Milind Jain
>            Priority: Major
>              Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to