[ https://issues.apache.org/jira/browse/KAFKA-17804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892027#comment-17892027 ]
kangning.li commented on KAFKA-17804: ------------------------------------- Hi [~junrao] , I have some thoughts on this issue The method {{completeDelayedOperationsWhenNotPartitionLeader}} has 5 purgatories currently: # {{delayedProducePurgatory}} for produce, so we should check it whether it's "stop replica" or "become follower replica" # {{delayedFetchPurgatory}} fetch related # {{delayedRemoteFetchPurgatory}} fetch related # {{delayedRemoteListOffsetsPurgatory}} so we should check it whether it's "stop replica" or "become follower replica" # {{delayedShareFetchPurgatory}} fetch related Obout fetch related, I think there are 2 cases: # Fetch from follower replica: The target replica must be leader, so we should check it whether it's "stop replica" or "become follower replica" # Fetch from client: If client configuration item "client.rack" is set, we may don't need check the purgatory; if not, we also should check it whether "stop replica" or "become follower replica" If the above thoughts are correct, we should not modify method {{completeDelayedOperationsWhenNotPartitionLeader}} directly like this private def completeDelayedOperationsWhenNotPartitionLeader(topicPartition: TopicPartition, topicId: Option[Uuid], isStopReplica: Boolean): Unit = \{ val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition) delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey) delayedRemoteListOffsetsPurgatory.checkAndComplete(topicPartitionOperationKey) // just stop check if (isStopReplica) { delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey) delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey) if (topicId.isDefined) delayedShareFetchPurgatory.checkAndComplete( new DelayedShareFetchPartitionKey(topicId.get, topicPartition.partition())) } } And we should modify purgatory related, check config {{{}FetchParams{}}}. If it's from client and "client.rack" is set, then let it execute normally instead of calling method {{{}forceComplete(){}}}. So we may need modify DelayedFetch DelayedRemoteFetch DelayedShareFetch and add some junit tests. I'm not sure if my thinking is correct. WDYT ? > optimize ReplicaManager.completeDelayedOperationsWhenNotPartitionLeader > ----------------------------------------------------------------------- > > Key: KAFKA-17804 > URL: https://issues.apache.org/jira/browse/KAFKA-17804 > Project: Kafka > Issue Type: Improvement > Components: core > Reporter: Jun Rao > Assignee: kangning.li > Priority: Minor > > Currently, ReplicaManager.completeDelayedOperationsWhenNotPartitionLeader is > called when (1) a replica is removed from the broker and (2) a replica > becomes a follower replica and it checks the completion of multiple > purgatories. However, not all purgatories need to be checked in both > situations. For example, the fetch purgatory doesn't need to be checked in > case (2) since we support fetch from follower now. -- This message was sent by Atlassian Jira (v8.20.10#820010)