[jira] [Commented] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id

2021-06-21 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367008#comment-17367008
 ] 

GEORGE LI commented on KAFKA-12971:
---

[~ijuma]  The Flink 1.9 can use 2.x kafka-clients, while Flink 1.4 still  not 
yet.   

> Kakfa 1.1.x clients cache broker hostnames,  client stuck when host is 
> swapped for the same broker.id
> -
>
> Key: KAFKA-12971
> URL: https://issues.apache.org/jira/browse/KAFKA-12971
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: GEORGE LI
>Priority: Major
> Fix For: 2.1.2
>
>
> There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug 
> in 0.11 with too frequent consumer offset commits.  Due to the Flink version, 
> it can be directly using latest 2.x kafka-client version. 
> {code}
> Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: 
> org.apache.kafka.common.errors.DisconnectException.
> {code}
> some consumers were stuck with above messages with broker.id 425 had hardware 
> failures and got swapped with a different hostname. 
> Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 
> 0.11.0.3: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now, this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> 1.1.x: 
> {code}
>  public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> if (nodeState.containsKey(id)) {
> NodeConnectionState connectionState = nodeState.get(id);
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> } else {
> nodeState.put(id, new 
> NodeConnectionState(ConnectionState.CONNECTING, now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> }
> {code}
> 2.2.x: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> NodeConnectionState connectionState = nodeState.get(id);
> if (connectionState != null && connectionState.host().equals(host)) {
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> return;
> } else if (connectionState != null) {
> log.info("Hostname for node {} changed from {} to {}.", id, 
> connectionState.host(), host);
> }
> // Create a new NodeConnectionState if nodeState does not already 
> contain one
> // for the specified id or if the hostname associated with the node 
> id changed.
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> From above, the {{0.11.0.3}} is just putting the node to the NodeState 
> HashMap to retry with update host.
> In {{1.1.x}}, it adds a logic of "caching". {{if 
> (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is 
> swapped/changed, it never gets to the else block to update the NodeState with 
> the new hostname.
> In {{2.2.x}}, it adds an additional check {{if (connectionState != null && 
> connectionState.host().equals(host))}}, if the Hostname changed, then called 
> {{nodeState.put()}} to update the host.
> So from above, it looks like the 1.1.x caching logic introduced a bug of not 
> updating the nodeState()'s host when that is changed (e..g host failure, swap 
> with a different hostname, but use the same broker.id).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id

2021-06-19 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17366015#comment-17366015
 ] 

Ismael Juma commented on KAFKA-12971:
-

Thanks for the report. 1.1.1 is no longer supported. Any reason why you cannot 
upgrade to 2.x?

> Kakfa 1.1.x clients cache broker hostnames,  client stuck when host is 
> swapped for the same broker.id
> -
>
> Key: KAFKA-12971
> URL: https://issues.apache.org/jira/browse/KAFKA-12971
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: GEORGE LI
>Priority: Major
> Fix For: 2.1.2
>
>
> There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug 
> in 0.11 with too frequent consumer offset commits.  Due to the Flink version, 
> it can be directly using latest 2.x kafka-client version. 
> {code}
> Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: 
> org.apache.kafka.common.errors.DisconnectException.
> {code}
> some consumers were stuck with above messages with broker.id 425 had hardware 
> failures and got swapped with a different hostname. 
> Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 
> 0.11.0.3: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now, this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> 1.1.x: 
> {code}
>  public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> if (nodeState.containsKey(id)) {
> NodeConnectionState connectionState = nodeState.get(id);
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> } else {
> nodeState.put(id, new 
> NodeConnectionState(ConnectionState.CONNECTING, now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> }
> {code}
> 2.2.x: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> NodeConnectionState connectionState = nodeState.get(id);
> if (connectionState != null && connectionState.host().equals(host)) {
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> return;
> } else if (connectionState != null) {
> log.info("Hostname for node {} changed from {} to {}.", id, 
> connectionState.host(), host);
> }
> // Create a new NodeConnectionState if nodeState does not already 
> contain one
> // for the specified id or if the hostname associated with the node 
> id changed.
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> From above, the {{0.11.0.3}} is just putting the node to the NodeState 
> HashMap to retry with update host.
> In {{1.1.x}}, it adds a logic of "caching". {{if 
> (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is 
> swapped/changed, it never gets to the else block to update the NodeState with 
> the new hostname.
> In {{2.2.x}}, it adds an additional check {{if (connectionState != null && 
> connectionState.host().equals(host))}}, if the Hostname changed, then called 
> {{nodeState.put()}} to update the host.
> So from above, it looks like the 1.1.x caching logic introduced a bug of not 
> updating the nodeState()'s host when that is changed (e..g host failure, swap 
> with a different hostname, but use the same broker.id).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id

2021-06-19 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17366011#comment-17366011
 ] 

GEORGE LI commented on KAFKA-12971:
---

This issue is fixed in the {{1.1.x}} kafka client by back porting the fix in 
KAFKA-7890  from 2.x to invalidate the cache when the hostname is changed. 

> Kakfa 1.1.x clients cache broker hostnames,  client stuck when host is 
> swapped for the same broker.id
> -
>
> Key: KAFKA-12971
> URL: https://issues.apache.org/jira/browse/KAFKA-12971
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: GEORGE LI
>Priority: Major
> Fix For: 2.1.2
>
>
> There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug 
> in 0.11 with too frequent consumer offset commits.  Due to the Flink version, 
> it can be directly using latest 2.x kafka-client version. 
> {code}
> Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: 
> org.apache.kafka.common.errors.DisconnectException.
> {code}
> some consumers were stuck with above messages with broker.id 425 had hardware 
> failures and got swapped with a different hostname. 
> Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 
> 0.11.0.3: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now, this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> 1.1.x: 
> {code}
>  public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> if (nodeState.containsKey(id)) {
> NodeConnectionState connectionState = nodeState.get(id);
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> } else {
> nodeState.put(id, new 
> NodeConnectionState(ConnectionState.CONNECTING, now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> }
> {code}
> 2.2.x: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> NodeConnectionState connectionState = nodeState.get(id);
> if (connectionState != null && connectionState.host().equals(host)) {
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> return;
> } else if (connectionState != null) {
> log.info("Hostname for node {} changed from {} to {}.", id, 
> connectionState.host(), host);
> }
> // Create a new NodeConnectionState if nodeState does not already 
> contain one
> // for the specified id or if the hostname associated with the node 
> id changed.
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> From above, the {{0.11.0.3}} is just putting the node to the NodeState 
> HashMap to retry with update host.
> In {{1.1.x}}, it adds a logic of "caching". {{if 
> (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is 
> swapped/changed, it never gets to the else block to update the NodeState with 
> the new hostname.
> In {{2.2.x}}, it adds an additional check {{if (connectionState != null && 
> connectionState.host().equals(host))}}, if the Hostname changed, then called 
> {{nodeState.put()}} to update the host.
> So from above, it looks like the 1.1.x caching logic introduced a bug of not 
> updating the nodeState()'s host when that is changed (e..g host failure, swap 
> with a different hostname, but use the same broker.id).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)