[ 
https://issues.apache.org/jira/browse/KAFKA-13331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-13331:
-----------------------------------
    Priority: Critical  (was: Major)

> Slow reassignments in 2.8 because of large number of  
> UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),<broker.id>)
>  Events
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13331
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13331
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin
>    Affects Versions: 2.8.1, 3.0.0
>            Reporter: GEORGE LI
>            Priority: Critical
>             Fix For: 3.1.0
>
>         Attachments: Screen Shot 2021-09-28 at 12.57.34 PM.png
>
>
> Slowness is observed when doing reassignments on clusters with more brokers 
> (e.g. 80 brokers). 
> After investigation, it looks like the slowness is because  for 
> reassignments, it sends the UpdateMetadataRequest to all the broker for every 
> topic partition affected by the reassignment (maybe some optimization can be 
> done).   e.g. 
> for a reassignment with batch size of 50 partitions.  it will generate about 
> 10k - 20k ControllerEventManager.EventQueueSize and  the p99 EventQueueTimeMs 
> will be  1M.   if the batch size is 100 partitions,   about 40K 
> EventQeuueSize and  3M p99 EventQueueTimeMs.   See below screen shot on the 
> metrics.  
>  !Screen Shot 2021-09-28 at 12.57.34 PM.png! 
> it takes about 10-30minutes to process 100 reassignments per batch.   and 
> 20-30 seconds for 1 reassignment per batch even the topic partition is almost 
> empty.   Version 1.1, the reassignment is almost instant. 
> Looking at what is in the ControllerEventManager.EventQueue,  the majority 
> (depends on the how many brokers in the cluster, it can be 90%+)  is  
> {{UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),<broker.id>)}}
>  events.   which is introduced  in this commit: 
> {code}
> commit 4e431246c31170a7f632da8edfdb9cf4f882f6ef
> Author: Jason Gustafson <ja...@confluent.io>
> Date:   Thu Nov 21 07:41:29 2019 -0800
>     MINOR: Controller should log UpdateMetadata response errors (#7717)
>     
>     Create a controller event for handling UpdateMetadata responses and log a 
> message when a response contains an error.
>     
>     Reviewers: Stanislav Kozlovski <stanislav_kozlov...@outlook.com>, Ismael 
> Juma <ism...@juma.me.uk>
> {code}
> Checking how the events in the ControllerEventManager are processed for  
> {{UpdateMetadata response}},  it seems like it's only checking whether there 
> is an error, and simply log the error.  but it takes about 3ms - 60ms  to 
> dequeue each event.  Because it's a FIFO queue,  other events were waiting in 
> the queue. 
> {code}
>   private def processUpdateMetadataResponseReceived(updateMetadataResponse: 
> UpdateMetadataResponse, brokerId: Int): Unit = {
>     if (!isActive) return
>     if (updateMetadataResponse.error != Errors.NONE) {
>       stateChangeLogger.error(s"Received error 
> ${updateMetadataResponse.error} in UpdateMetadata " +
>         s"response $updateMetadataResponse from broker $brokerId")
>     }
>   }
> {code}
> There might be more sophisticated logic for handling the UpdateMetadata 
> response error in the future.   For current version,   would it be better to 
> check whether  the response error code is  Errors.NONE before putting into 
> the Event Queue?   e.g.  I put this additional check and see the Reassignment 
> Performance increase dramatically on the 80 brokers cluster. 
> {code}
>      val updateMetadataRequestBuilder = new 
> UpdateMetadataRequest.Builder(updateMetadataRequestVersion,
>         controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava, 
> liveBrokers.asJava, topicIds.asJava)
>       sendRequest(broker, updateMetadataRequestBuilder, (r: AbstractResponse) 
> => {
>         val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse]
>         if (updateMetadataResponse.error != Errors.NONE) {   //<============ 
> Add additional check whether the response code,   if no error, which is 
> almost 99.99% the case, skip adding updateMetadataResponse to the Event 
> Queue. 
>           sendEvent(UpdateMetadataResponseReceived(updateMetadataResponse, 
> broker))
>         }
>       })
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to