[ 
https://issues.apache.org/jira/browse/KAFKA-7786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16736415#comment-16736415
 ] 

Jun Rao commented on KAFKA-7786:
--------------------------------

Great find, Anna. About the fix. I am not sure that we need the protocol 
change. We know the leader epoch used in OffsetsForLeaderEpoch request. We can 
just pass that along with the OffsetsForLeaderEpoch response to 
maybeTruncate(). If the leaderEpoch in partitionStates has changed, we simply 
ignore the response and retry the OffsetsForLeaderEpoch request with the new 
leader epoch.

 

> Fast update of leader epoch may stall partition fetching due to 
> FENCED_LEADER_EPOCH
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-7786
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7786
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.1.0
>            Reporter: Anna Povzner
>            Assignee: Anna Povzner
>            Priority: Critical
>
> KIP-320/KAFKA-7395 Added FENCED_LEADER_EPOCH error response to a 
> OffsetsForLeaderEpoch request if the epoch in the request is lower than the 
> broker's leader epoch. ReplicaFetcherThread builds a OffsetsForLeaderEpoch 
> request under _partitionMapLock_, sends the request outside the lock, and 
> then processes the response under _partitionMapLock_. The broker may receive 
> LeaderAndIsr with the same leader but with the next leader epoch, remove and 
> add partition to the fetcher thread (with partition state reflecting the 
> updated leader epoch) – all while the OffsetsForLeaderEpoch request (with the 
> old leader epoch) is still outstanding/ waiting for the lock to process the 
> OffsetsForLeaderEpoch response. As a result, partition gets removed from 
> partitionStates and this broker will not fetch for this partition until the 
> next LeaderAndIsr which may take a while. We will see log message like this:
> [2018-12-23 07:23:04,802] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Partition test_topic-17 has an older epoch (7) than the current 
> leader. Will await the new LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaFetcherThread)
> We saw this happen with 
> kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=True.reassign_from_offset_zero=True.
>  This test does partition re-assignment while bouncing 2 out of 4 total 
> brokers. When the failure happen, each bounced broker was also a controller. 
> Because of re-assignment, the controller updates leader epoch without 
> updating the leader on controller change or on broker startup, so we see 
> several leader epoch changes without the leader change, which increases the 
> likelihood of the race condition described above.
> Here is exact events that happen in this test (around the failure):
> We have 4 brokers Brokers 1, 2, 3, 4. Partition re-assignment is started for 
> test_topic-17 [2, 4, 1]  —> [3, 1, 2]. At time t0, leader of test_topic-17 is 
> broker 2.
>  # clean shutdown of broker 3, which is also a controller
>  # broker 4 becomes controller, continues re-assignment and updates leader 
> epoch for test_topic-17 to 6 (with same leader)
>  # broker 2 (leader of test_topic-17) receives new leader epoch: 
> “test_topic-17 starts at Leader Epoch 6 from offset 1388. Previous Leader 
> Epoch was: 5”
>  # broker 3 is started again after clean shutdown
>  # controller sees broker 3 startup, and sends LeaderAndIsr(leader epoch 6) 
> to broker 3
>  # controller updates leader epoch to 7
>  # broker 2 (leader of test_topic-17) receives LeaderAndIsr for leader epoch 
> 7: “test_topic-17 starts at Leader Epoch 7 from offset 1974. Previous Leader 
> Epoch was: 6”
>  # broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 6 from 
> controller: “Added fetcher to broker BrokerEndPoint(id=2) for leader epoch 6” 
> and sends OffsetsForLeaderEpoch request to broker 2
>  # broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 7 from 
> controller; removes fetcher thread and adds fetcher thread + executes 
> AbstractFetcherThread.addPartitions() which updates partition state with 
> leader epoch 7
>  # broker 3 receives FENCED_LEADER_EPOCH in response to 
> OffsetsForLeaderEpoch(leader epoch 6), because the leader received 
> LeaderAndIsr for leader epoch 7 before it got OffsetsForLeaderEpoch(leader 
> epoch 6) from broker 3. As a result, it removes partition from 
> partitionStates and it does not fetch until controller updates leader epoch 
> and sends LeaderAndIsr for this partition to broker 3. The test fails, 
> because re-assignment does not finish on time (due to broker 3 not fetching).
>  
> One way to address this is possibly add more state to PartitionFetchState. 
> However, we may introduce other race condition. A cleaner way, I think, is to 
> return leader epoch in the OffsetsForLeaderEpoch response with 
> FENCED_LEADER_EPOCH error, and then ignore the error if partition state 
> contains a higher leader epoch. The advantage is less state maintenance, but 
> disadvantage is it requires bumping inter-broker protocol.
> h1.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to