[
https://issues.apache.org/jira/browse/KAFKA-9769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andrew Choi updated KAFKA-9769:
-------------------------------
Labels: kafka replica replication (was: )
> ReplicaManager Partition.makeFollower Increases LeaderEpoch when ZooKeeper
> disconnect occurs
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-9769
> URL: https://issues.apache.org/jira/browse/KAFKA-9769
> Project: Kafka
> Issue Type: Bug
> Components: replication
> Reporter: Andrew Choi
> Priority: Minor
> Labels: kafka, replica, replication
>
> The ZooKeeper Session once expired and got disconnected and the broker
> received the 1st LeaderAndIsr request simultaneously. As the broker was
> processing the 1st LeaderAndIsr Request, the ZooKeeper session has not been
> reestablished just yet.
> Within the makeFollowers method, _partition.getOrCreateReplica_ is called
> before the fetcher begins. _partition.getOrCreateReplica_ needs to fetch
> information from ZooKeeper but an exception is thrown when calling the
> ZooKeeper client because the session is invalid, rendering the fetcher start
> to be skipped.
>
> In Partition class's getOrCreateReplica method calls AdminZkClient's
> fetchEntityConfig(..) which throws an exception if the ZooKeeper session is
> invalid.
>
> {code:java}
> val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic){code}
>
> When this occurs, the leader epoch should not have been incremented due to
> ZooKeeper being invalid because once the second LeaderAndIsr request comes
> in, the leader epoch could be the same between the brokers.
> Few options I can think of for a fix. I think third route could be feasible:
> 1 - Make LeaderEpoch update and fetch update atomic.
> 2 - Wait until all individual partitions are successful without problems then
> process fetch.
> 3 - Catch the ZooKeeper exception in the caller code block
> (ReplicaManager.makeFollowers) and simply do not touch the remaining
> partitions to ensure that the batch of successful partitions up to that point
> are updated and processed (fetch).
> 4 - Or make LeaderAndIsr request never arrive at the broker in case of
> ZooKeeper disconnect, then that would be safe because it is already possible
> for some replicas to receive the LeaderAndIsr later than the others. However,
> in that case, the code need to make sure the controller will retry.
>
> {code:java}
> else if (requestLeaderEpoch > currentLeaderEpoch) {
> // If the leader epoch is valid record the epoch of the controller that made
> the leadership decision.
> // This is useful while updating the isr to maintain the decision maker
> controller's epoch in the zookeeper path
> if (stateInfo.basePartitionState.replicas.contains(localBrokerId))
> partitionState.put(partition, stateInfo)
> else
>
> def getOrCreateReplica(replicaId: Int, isNew: Boolean = false): Replica = {
> allReplicasMap.getAndMaybePut(replicaId, {
> if (isReplicaLocal(replicaId)) {
> val adminZkClient = new AdminZkClient(zkClient) val props =
> adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)