Alexandre Dupriez created KAFKA-14845:
-----------------------------------------

             Summary: Broker ZNode creation can fail due to a session ID 
unknown to the broker
                 Key: KAFKA-14845
                 URL: https://issues.apache.org/jira/browse/KAFKA-14845
             Project: Kafka
          Issue Type: Bug
            Reporter: Alexandre Dupriez
         Attachments: broker-registration.drawio

Our production environment faced a use case where registration of a broker 
failed due to the presence of a "conflicting" broker znode in Zookeeper. This 
case is not without familiarity to that fixed by KAFKA-6584 and induced by the 
Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.

A network partition disturbed communication channels between the Kafka and 
Zookeeper clusters for about 20% of the brokers in the cluster. One of this 
broker was not able to re-register with Zookeeper and was excluded from the 
cluster until it was restarted. Broker logs show the failed registration due to 
a "conflicting" znode write which in this case is not covered by KAFKA-6584. 
The broker did not restart and was not unhealthy. In the following logs, the 
broker IP is 1.2.3.4.

The sequence of logs on the broker is as follows.

First, a connection is established with the Zookeeper node 3.

 
{code:java}
[2023-03-05 16:01:55,342] INFO Socket connection established, initiating 
session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, 
L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
 

An existing Zookeeper session was expired, and upon reconnection, the Zookeeper 
state change handler was invoked. The creation of the ephemeral znode 
/brokers/ids/18 started on the controller thread.

 
{code:java}
[2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) 
(kafka.zk.KafkaZkClient){code}
 

The client "session" timed out after 6 seconds. Note the session is 0x0 and the 
absence of "{_}Session establishment complete{_}" log: the broker appears to 
have never received or processed the response from the Zookeeper node.

 
{code:java}
[2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from 
server in 6000ms for sessionid 0x0, closing socket connection and attempting 
reconnect (org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, 
L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
 

Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client 
started waiting on a new connection notification.

 
{code:java}
[2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient){code}
 

A new connection was created with the Zookeeper node 1. Note that a valid (new) 
session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.

 
{code:java}
[2023-03-05 16:02:02,037] INFO Socket connection established, initiating 
session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, 
L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-05 16:02:03,054] INFO Session establishment complete on server 
zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 18000 
(org.apache.zookeeper.ClientCnxn){code}
 

The Kafka ZK client is notified of the connection.

 
{code:java}
[2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient){code}
 

The broker sends the request to create the znode {{/brokers/ids/18}} which 
already exists. The error path implemented for KAFKA-6584 is then followed. 
However, in this case, the session owning the ephemeral node 
{{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last 
active Zookeeper session which the broker has recorded. And it is also 
different from the current session {{0x1006c6e0b830001}} 
({{{}72176813933264897{}}}), hence the recreation of the broker znode is not 
attempted.

 
{code:java}
[2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at 
/brokers/ids/18, node already exists and owner '216172783240153793' does not 
match current session '72176813933264897' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
NodeExists
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
        at 
kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
        at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
        at 
kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
        at kafka.controller.KafkaController.process(KafkaController.scala:1853)
        at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
 

The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper 
server logs:

 
{code:java}
[2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of 
18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2023-03-05 16:02:21,336] INFO Submitting global closeSession request for 
session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)

{code}
The ephemeral znode is then deleted and never recreated. The broker is not 
registered. Only a broker restart (or forced recreation of the Zookeeper 
session) can mitigate at this point.

I fail to understand where the session {{0x300000043230ac1}} comes from. An 
analysis of the commit logs from Zookeeper does show the following sequence of 
transactions with timestamps from the Zookeeper node.

- 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
- 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), 
SetData(<BrokerInfo>)]
- 2023-03-05T16:02:21.336Z: CloseSession [-11]

A code based example was attempted to reproduce the use case (available 
[here|https://github.com/Hangleton/kafka-tools/blob/master/src/main/java/kafka/repro/BrokerRegistrationTest.java]),
 however, it fails to reproduce the occurrence of the "phantom" intermediate 
session. Instead, the test case forces session renewal after the Multi 
transaction is processed for the first time. It basically tries to produce the 
following sequence.

[^broker-registration.drawio]

Note the session renewal right after the response from the first Multi request 
is dropped. There is a race condition between the {{AdminZkClient}} and the 
underlying ZK client which is explicitly controlled in this test so that the 
{{AdminZkClient}} does not see the ZK client as disconnected.

The fix for KAFKA-6584 does not cover this case because here, the session ID is 
not surfaced and recorded by the ZK client in Kafka (there was no lower-level 
logs to ascertain if the Netty client ever received any response for that 
session).

As a remediation, perhaps the source of identity of the broker (currently 
conveyed via the Zookeeper session ID) could be explicitly added to the znode 
data (assuming the Zookeeper Multi is atomic, the znode must have the 
BrokerInfo (or any other user data provided with the SetData command) if and 
only if it is successfully created).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to