[ 
https://issues.apache.org/jira/browse/KAFKA-17804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892027#comment-17892027
 ] 

kangning.li commented on KAFKA-17804:
-------------------------------------

Hi [~junrao] , I have some thoughts on this issue



The method {{completeDelayedOperationsWhenNotPartitionLeader}} has 5 
purgatories currently: 
 # {{delayedProducePurgatory}} for produce, so we should check it whether it's 
"stop replica" or "become follower replica" 
 # {{delayedFetchPurgatory}} fetch related
 # {{delayedRemoteFetchPurgatory}} fetch related
 # {{delayedRemoteListOffsetsPurgatory}} so we should check it whether it's 
"stop replica" or "become follower replica" 
 # {{delayedShareFetchPurgatory}} fetch related

Obout fetch related, I think there are 2 cases:
 # Fetch from follower replica: The target replica must be leader, so we should 
check it whether it's "stop replica" or "become follower replica" 
 # Fetch from client: If client configuration item "client.rack" is set, we may 
don't need check the purgatory; if not, we also should check it whether "stop 
replica" or "become follower replica" 


If the above thoughts are correct, we should not modify method 
{{completeDelayedOperationsWhenNotPartitionLeader}} directly like this
private def completeDelayedOperationsWhenNotPartitionLeader(topicPartition: 
TopicPartition, topicId: Option[Uuid],
                                                            isStopReplica: 
Boolean): Unit = \{
  val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
  delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
  delayedRemoteListOffsetsPurgatory.checkAndComplete(topicPartitionOperationKey)
  // just stop check
  if (isStopReplica) {
    delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
    delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
    if (topicId.isDefined) delayedShareFetchPurgatory.checkAndComplete(
      new DelayedShareFetchPartitionKey(topicId.get, 
topicPartition.partition()))
  }
}

And we should modify purgatory related, check config {{{}FetchParams{}}}. If 
it's from client and "client.rack" is set, then let it execute normally instead 
of calling method {{{}forceComplete(){}}}. So we may need modify DelayedFetch 
DelayedRemoteFetch DelayedShareFetch and add some junit tests.




I'm not sure if my thinking is correct. WDYT ?

> optimize ReplicaManager.completeDelayedOperationsWhenNotPartitionLeader
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-17804
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17804
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>            Reporter: Jun Rao
>            Assignee: kangning.li
>            Priority: Minor
>
> Currently, ReplicaManager.completeDelayedOperationsWhenNotPartitionLeader is 
> called when (1) a replica is removed from the broker and (2) a replica 
> becomes a follower replica and it checks the completion of multiple 
> purgatories.  However, not all purgatories need to be checked in both 
> situations. For example, the fetch purgatory doesn't need to be checked in 
> case (2) since we support fetch from follower now. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to