[ https://issues.apache.org/jira/browse/KAFKA-13331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Satish Duggana updated KAFKA-13331: ----------------------------------- Priority: Critical (was: Major) > Slow reassignments in 2.8 because of large number of > UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),<broker.id>) > Events > ------------------------------------------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-13331 > URL: https://issues.apache.org/jira/browse/KAFKA-13331 > Project: Kafka > Issue Type: Bug > Components: admin > Affects Versions: 2.8.1, 3.0.0 > Reporter: GEORGE LI > Priority: Critical > Fix For: 3.1.0 > > Attachments: Screen Shot 2021-09-28 at 12.57.34 PM.png > > > Slowness is observed when doing reassignments on clusters with more brokers > (e.g. 80 brokers). > After investigation, it looks like the slowness is because for > reassignments, it sends the UpdateMetadataRequest to all the broker for every > topic partition affected by the reassignment (maybe some optimization can be > done). e.g. > for a reassignment with batch size of 50 partitions. it will generate about > 10k - 20k ControllerEventManager.EventQueueSize and the p99 EventQueueTimeMs > will be 1M. if the batch size is 100 partitions, about 40K > EventQeuueSize and 3M p99 EventQueueTimeMs. See below screen shot on the > metrics. > !Screen Shot 2021-09-28 at 12.57.34 PM.png! > it takes about 10-30minutes to process 100 reassignments per batch. and > 20-30 seconds for 1 reassignment per batch even the topic partition is almost > empty. Version 1.1, the reassignment is almost instant. > Looking at what is in the ControllerEventManager.EventQueue, the majority > (depends on the how many brokers in the cluster, it can be 90%+) is > {{UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),<broker.id>)}} > events. which is introduced in this commit: > {code} > commit 4e431246c31170a7f632da8edfdb9cf4f882f6ef > Author: Jason Gustafson <ja...@confluent.io> > Date: Thu Nov 21 07:41:29 2019 -0800 > MINOR: Controller should log UpdateMetadata response errors (#7717) > > Create a controller event for handling UpdateMetadata responses and log a > message when a response contains an error. > > Reviewers: Stanislav Kozlovski <stanislav_kozlov...@outlook.com>, Ismael > Juma <ism...@juma.me.uk> > {code} > Checking how the events in the ControllerEventManager are processed for > {{UpdateMetadata response}}, it seems like it's only checking whether there > is an error, and simply log the error. but it takes about 3ms - 60ms to > dequeue each event. Because it's a FIFO queue, other events were waiting in > the queue. > {code} > private def processUpdateMetadataResponseReceived(updateMetadataResponse: > UpdateMetadataResponse, brokerId: Int): Unit = { > if (!isActive) return > if (updateMetadataResponse.error != Errors.NONE) { > stateChangeLogger.error(s"Received error > ${updateMetadataResponse.error} in UpdateMetadata " + > s"response $updateMetadataResponse from broker $brokerId") > } > } > {code} > There might be more sophisticated logic for handling the UpdateMetadata > response error in the future. For current version, would it be better to > check whether the response error code is Errors.NONE before putting into > the Event Queue? e.g. I put this additional check and see the Reassignment > Performance increase dramatically on the 80 brokers cluster. > {code} > val updateMetadataRequestBuilder = new > UpdateMetadataRequest.Builder(updateMetadataRequestVersion, > controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava, > liveBrokers.asJava, topicIds.asJava) > sendRequest(broker, updateMetadataRequestBuilder, (r: AbstractResponse) > => { > val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse] > if (updateMetadataResponse.error != Errors.NONE) { //<============ > Add additional check whether the response code, if no error, which is > almost 99.99% the case, skip adding updateMetadataResponse to the Event > Queue. > sendEvent(UpdateMetadataResponseReceived(updateMetadataResponse, > broker)) > } > }) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)