[ 
https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16946323#comment-16946323
 ] 

Guozhang Wang commented on KAFKA-8206:
--------------------------------------

[~ivanyu] Thanks for the PR, I think your approach is promising but since it 
changes the default behavior of consumer / producer clients which is part of 
our public API (it is a semantics change), we'd better have a discussion about 
its pros / cons under different scenarios. Could you drive the KIP discussion 
as well?

> A consumer can't discover new group coordinator when the cluster was partly 
> restarted
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8206
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8206
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.0.0, 2.0.0, 2.2.0
>            Reporter: alex gabriel
>            Assignee: Ivan Yurchenko
>            Priority: Critical
>
> *A consumer can't discover new group coordinator when the cluster was partly 
> restarted*
> Preconditions:
> I use Kafka server and Java kafka-client lib 2.2 version
> I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 
> ZK(localhost:2181)
> I have replication factor 2 for the all my topics and 
> '_unclean.leader.election.enable=true_' on both Kafka nodes.
> Steps to reproduce:
> 1) Start 2nodes (localhost:9092/localhost:9093)
> 2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'
> {noformat}
> // discovered group coordinator (0-node)
> 2019-04-09 16:23:18,963 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>
> ...metadatacache is updated (2 nodes in the cluster list)
> 2019-04-09 16:23:18,928 DEBUG 
> [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
> [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending 
> metadata request (type=MetadataRequest, topics=<ALL>) to node localhost:9092 
> (id: -1 rack: null)>
> 2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - 
> Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = 
> P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), 
> localhost:9093 (id: 1 rack: null)], partitions = [], controller = 
> localhost:9092 (id: 0 rack: null))}>
> {noformat}
> 3) Shutdown 1-node (localhost:9093)
> {noformat}
> // metadata was updated to the 4 version (but for some reasons it still had 2 
> alive nodes inside cluster)
> 2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - 
> Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = 
> P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), 
> localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = 
> events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], 
> offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader 
> = 0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = 
> localhost:9092 (id: 0 rack: null))}>
> //consumers thinks that node-1 is still alive and try to send coordinator 
> lookup to it but failed
> 2019-04-09 16:23:46,981 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
> 2019-04-09 16:23:46,981 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
> coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or 
> invalid, will attempt rediscovery>
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
> 2019-04-09 16:24:01,117 WARN 
> [org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 
> (localhost:9093) could not be established. Broker may not be available.>
> // refreshing metadata again
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, 
> apiVersion=2, clientId=events-consumer0, correlationId=112) due to node 1 
> being disconnected>
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Coordinator discovery failed, refreshing metadata>
> // metadata was updated to the 5 version where cluster had only 0-node 
> localhost:9092 as expected.
> 2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - 
> Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = 
> P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], 
> partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, 
> replicas = [0,1], isr = [0], offlineReplicas = [1]), Partition(topic = 
> events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0], 
> offlineReplicas = [1])], controller = localhost:9092 (id: 0 rack: null))}>
> // 0-node discovered as coordinator
> 2019-04-09 16:24:01,132 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>
> {noformat}
> *At this point consumer stores only information about 0-node(localhost:9092) 
> inside cluster property of the metadata cache.*
> 4) Shutdown 0-node (localhost:9092)
> 5) Start 1-node (localhost:9093)
> {noformat}
> //consumer tries to re-connect only to the 0-node
> 2019-04-09 16:24:40,649 DEBUG 
> [org.apache.kafka.common.network.Selector.pollSelectionKeys] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Connection with 
> localhost disconnected>
> 2019-04-09 16:24:40,649 DEBUG 
> [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Node 0 disconnected.>
> 2019-04-09 16:24:40,649 DEBUG 
> [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Node 2147483647 
> disconnected.>
> 2019-04-09 16:24:40,649 DEBUG 
> [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=10, 
> clientId=events-consumer0, correlationId=209) due to node 0 being 
> disconnected>
> 2019-04-09 16:24:40,649 INFO 
> [org.apache.kafka.clients.FetchSessionHandler.handleError] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Error sending fetch 
> request (sessionId=1754516055, epoch=132) to node 0: 
> org.apache.kafka.common.errors.DisconnectException.>
> 2019-04-09 16:24:40,650 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
> coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or 
> invalid, will attempt rediscovery>
> 2019-04-09 16:24:40,650 DEBUG 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.lookupCoordinator]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] No 
> broker available to send FindCoordinator request>
> 2019-04-09 16:24:40,650 DEBUG 
> [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
> [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Give up 
> sending metadata request since no node is available>
> {noformat}
> As I see the consumer tries to initialize connection only to the 0-node cause 
> this is only node inside consumer's cluster metadata cache, thus consumer 
> can't get a connection to the new group coordinator 1-node and as result 
> can't poll any messages
> _To resolve this pending state 0-node must be restored OR consumer must be 
> re-started(in this case consumer discover a lastly started node(1-node in my 
> case) from the initial brokers list)_
> p.s. The same behavior could be reproduced using a 3 nodes cluster.
> _The question: is this by design that consumer stores metadata cache with 
> only active nodes list?_
> Since it leads to a situation when a last active node from the list goes down 
> and the other node come to life but the consumer doesn't have any info about 
> that node and trying to re-connect to the last active node ignoring new node.
> From my point of view, a consumer should always store some initial nodes list 
> and try to reconnect to the nodes from the initial list in case if there are 
> no alive nodes from the metadata cluster cache.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to