[jira] [Commented] (KAFKA-13331) Slow reassignments in 2.8 because of large number of UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),) Events
[ https://issues.apache.org/jira/browse/KAFKA-13331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17423761#comment-17423761 ] GEORGE LI commented on KAFKA-13331: --- besides slowness of the reassignments, when the EventQueue is long, with a lot of UpdateMetadataRequestResponse events. The cluster is also not behaving correctly (slow / timeout with some RPC calls). e.g. using {{kafka-topics.sh --bootstrap-server}} will time out like below. because it puts {{listPartitionReassignments}} in the EventQueue. The old way {{kafka-topics.sh --zookeeper}} returned fast. {code} $ /usr/lib/kafka/bin/kafka-topics.sh --bootstrap-server :9092 --topic --describe Error while executing topic command : Call(callName=listPartitionReassignments, deadlineMs=1632339660352, tries=1, nextAllowedTryMs=1632339660520) timed out at 1632339660420 after 1 attempt(s) [2021-09-22 19:41:00,425] ERROR org.apache.kafka.common.errors.TimeoutException: Call(callName=listPartitionReassignments, deadlineMs=1632339660352, tries=1, nextAllowedTryMs=1632339660520) timed out at 1632339660420 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled listPartitionReassignments request with correlation id 9 due to node 10043 being disconnected (kafka.admin.TopicCommand$) {code} Any RPC calls putting into the EventQueue will be affected during the reassignments (batch size 50+) in our environment. > Slow reassignments in 2.8 because of large number of > UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),) > Events > > > Key: KAFKA-13331 > URL: https://issues.apache.org/jira/browse/KAFKA-13331 > Project: Kafka > Issue Type: Bug > Components: admin, controller, core >Affects Versions: 2.8.1, 3.0.0 >Reporter: GEORGE LI >Priority: Critical > Fix For: 3.1.0 > > Attachments: Screen Shot 2021-09-28 at 12.57.34 PM.png > > > Slowness is observed when doing reassignments on clusters with more brokers > (e.g. 80 brokers). > After investigation, it looks like the slowness is because for > reassignments, it sends the UpdateMetadataRequest to all the broker for every > topic partition affected by the reassignment (maybe some optimization can be > done). e.g. > for a reassignment with batch size of 50 partitions. it will generate about > 10k - 20k ControllerEventManager.EventQueueSize and the p99 EventQueueTimeMs > will be 1M. if the batch size is 100 partitions, about 40K > EventQeuueSize and 3M p99 EventQueueTimeMs. See below screen shot on the > metrics. > !Screen Shot 2021-09-28 at 12.57.34 PM.png! > it takes about 10-30minutes to process 100 reassignments per batch. and > 20-30 seconds for 1 reassignment per batch even the topic partition is almost > empty. Version 1.1, the reassignment is almost instant. > Looking at what is in the ControllerEventManager.EventQueue, the majority > (depends on the how many brokers in the cluster, it can be 90%+) is > {{UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),)}} > events. which is introduced in this commit: > {code} > commit 4e431246c31170a7f632da8edfdb9cf4f882f6ef > Author: Jason Gustafson > Date: Thu Nov 21 07:41:29 2019 -0800 > MINOR: Controller should log UpdateMetadata response errors (#7717) > > Create a controller event for handling UpdateMetadata responses and log a > message when a response contains an error. > > Reviewers: Stanislav Kozlovski , Ismael > Juma > {code} > Checking how the events in the ControllerEventManager are processed for > {{UpdateMetadata response}}, it seems like it's only checking whether there > is an error, and simply log the error. but it takes about 3ms - 60ms to > dequeue each event. Because it's a FIFO queue, other events were waiting in > the queue. > {code} > private def processUpdateMetadataResponseReceived(updateMetadataResponse: > UpdateMetadataResponse, brokerId: Int): Unit = { > if (!isActive) return > if (updateMetadataResponse.error != Errors.NONE) { > stateChangeLogger.error(s"Received error > ${updateMetadataResponse.error} in UpdateMetadata " + > s"response $updateMetadataResponse from broker $brokerId") > } > } > {code} > There might be more sophisticated logic for handling the UpdateMetadata > response error in the future. For current version, would it be better to > check whether the response error code is Errors.NONE before putting into > the Event Queue? e.g. I put this additional check and see the Reassignment > Performance increase dramatically on the 80 brokers cluster. > {code} > val
[jira] [Created] (KAFKA-13331) Slow reassignments in 2.8 because of large number of UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),) Events
GEORGE LI created KAFKA-13331: - Summary: Slow reassignments in 2.8 because of large number of UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),) Events Key: KAFKA-13331 URL: https://issues.apache.org/jira/browse/KAFKA-13331 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 3.0.0, 2.8.1 Reporter: GEORGE LI Attachments: Screen Shot 2021-09-28 at 12.57.34 PM.png Slowness is observed when doing reassignments on clusters with more brokers (e.g. 80 brokers). After investigation, it looks like the slowness is because for reassignments, it sends the UpdateMetadataRequest to all the broker for every topic partition affected by the reassignment (maybe some optimization can be done). e.g. for a reassignment with batch size of 50 partitions. it will generate about 10k - 20k ControllerEventManager.EventQueueSize and the p99 EventQueueTimeMs will be 1M. if the batch size is 100 partitions, about 40K EventQeuueSize and 3M p99 EventQueueTimeMs. See below screen shot on the metrics. !Screen Shot 2021-09-28 at 12.57.34 PM.png! it takes about 10-30minutes to process 100 reassignments per batch. and 20-30 seconds for 1 reassignment per batch even the topic partition is almost empty. Version 1.1, the reassignment is almost instant. Looking at what is in the ControllerEventManager.EventQueue, the majority (depends on the how many brokers in the cluster, it can be 90%+) is {{UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),)}} events. which is introduced in this commit: {code} commit 4e431246c31170a7f632da8edfdb9cf4f882f6ef Author: Jason Gustafson Date: Thu Nov 21 07:41:29 2019 -0800 MINOR: Controller should log UpdateMetadata response errors (#7717) Create a controller event for handling UpdateMetadata responses and log a message when a response contains an error. Reviewers: Stanislav Kozlovski , Ismael Juma {code} Checking how the events in the ControllerEventManager are processed for {{UpdateMetadata response}}, it seems like it's only checking whether there is an error, and simply log the error. but it takes about 3ms - 60ms to dequeue each event. Because it's a FIFO queue, other events were waiting in the queue. {code} private def processUpdateMetadataResponseReceived(updateMetadataResponse: UpdateMetadataResponse, brokerId: Int): Unit = { if (!isActive) return if (updateMetadataResponse.error != Errors.NONE) { stateChangeLogger.error(s"Received error ${updateMetadataResponse.error} in UpdateMetadata " + s"response $updateMetadataResponse from broker $brokerId") } } {code} There might be more sophisticated logic for handling the UpdateMetadata response error in the future. For current version, would it be better to check whether the response error code is Errors.NONE before putting into the Event Queue? e.g. I put this additional check and see the Reassignment Performance increase dramatically on the 80 brokers cluster. {code} val updateMetadataRequestBuilder = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava, liveBrokers.asJava, topicIds.asJava) sendRequest(broker, updateMetadataRequestBuilder, (r: AbstractResponse) => { val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse] if (updateMetadataResponse.error != Errors.NONE) { //< Add additional check whether the response code, if no error, which is almost 99.99% the case, skip adding updateMetadataResponse to the Event Queue. sendEvent(UpdateMetadataResponseReceived(updateMetadataResponse, broker)) } }) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id
[ https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367008#comment-17367008 ] GEORGE LI commented on KAFKA-12971: --- [~ijuma] The Flink 1.9 can use 2.x kafka-clients, while Flink 1.4 still not yet. > Kakfa 1.1.x clients cache broker hostnames, client stuck when host is > swapped for the same broker.id > - > > Key: KAFKA-12971 > URL: https://issues.apache.org/jira/browse/KAFKA-12971 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.0, 1.1.1, 1.1.2 >Reporter: GEORGE LI >Priority: Major > Fix For: 2.1.2 > > > There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug > in 0.11 with too frequent consumer offset commits. Due to the Flink version, > it can be directly using latest 2.x kafka-client version. > {code} > Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: > org.apache.kafka.common.errors.DisconnectException. > {code} > some consumers were stuck with above messages with broker.id 425 had hardware > failures and got swapped with a different hostname. > Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: > 0.11.0.3: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, > now, this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > {code} > 1.1.x: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > if (nodeState.containsKey(id)) { > NodeConnectionState connectionState = nodeState.get(id); > connectionState.lastConnectAttemptMs = now; > connectionState.state = ConnectionState.CONNECTING; > // Move to next resolved address, or if addresses are exhausted, > mark node to be re-resolved > connectionState.moveToNextAddress(); > } else { > nodeState.put(id, new > NodeConnectionState(ConnectionState.CONNECTING, now, > this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > } > {code} > 2.2.x: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > NodeConnectionState connectionState = nodeState.get(id); > if (connectionState != null && connectionState.host().equals(host)) { > connectionState.lastConnectAttemptMs = now; > connectionState.state = ConnectionState.CONNECTING; > // Move to next resolved address, or if addresses are exhausted, > mark node to be re-resolved > connectionState.moveToNextAddress(); > return; > } else if (connectionState != null) { > log.info("Hostname for node {} changed from {} to {}.", id, > connectionState.host(), host); > } > // Create a new NodeConnectionState if nodeState does not already > contain one > // for the specified id or if the hostname associated with the node > id changed. > nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, > now, > this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > {code} > From above, the {{0.11.0.3}} is just putting the node to the NodeState > HashMap to retry with update host. > In {{1.1.x}}, it adds a logic of "caching". {{if > (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is > swapped/changed, it never gets to the else block to update the NodeState with > the new hostname. > In {{2.2.x}}, it adds an additional check {{if (connectionState != null && > connectionState.host().equals(host))}}, if the Hostname changed, then called > {{nodeState.put()}} to update the host. > So from above, it looks like the 1.1.x caching logic introduced a bug of not > updating the nodeState()'s host when that is changed (e..g host failure, swap > with a different hostname, but use the same broker.id). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id
[ https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17366011#comment-17366011 ] GEORGE LI commented on KAFKA-12971: --- This issue is fixed in the {{1.1.x}} kafka client by back porting the fix in KAFKA-7890 from 2.x to invalidate the cache when the hostname is changed. > Kakfa 1.1.x clients cache broker hostnames, client stuck when host is > swapped for the same broker.id > - > > Key: KAFKA-12971 > URL: https://issues.apache.org/jira/browse/KAFKA-12971 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.0, 1.1.1, 1.1.2 >Reporter: GEORGE LI >Priority: Major > Fix For: 2.1.2 > > > There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug > in 0.11 with too frequent consumer offset commits. Due to the Flink version, > it can be directly using latest 2.x kafka-client version. > {code} > Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: > org.apache.kafka.common.errors.DisconnectException. > {code} > some consumers were stuck with above messages with broker.id 425 had hardware > failures and got swapped with a different hostname. > Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: > 0.11.0.3: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, > now, this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > {code} > 1.1.x: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > if (nodeState.containsKey(id)) { > NodeConnectionState connectionState = nodeState.get(id); > connectionState.lastConnectAttemptMs = now; > connectionState.state = ConnectionState.CONNECTING; > // Move to next resolved address, or if addresses are exhausted, > mark node to be re-resolved > connectionState.moveToNextAddress(); > } else { > nodeState.put(id, new > NodeConnectionState(ConnectionState.CONNECTING, now, > this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > } > {code} > 2.2.x: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > NodeConnectionState connectionState = nodeState.get(id); > if (connectionState != null && connectionState.host().equals(host)) { > connectionState.lastConnectAttemptMs = now; > connectionState.state = ConnectionState.CONNECTING; > // Move to next resolved address, or if addresses are exhausted, > mark node to be re-resolved > connectionState.moveToNextAddress(); > return; > } else if (connectionState != null) { > log.info("Hostname for node {} changed from {} to {}.", id, > connectionState.host(), host); > } > // Create a new NodeConnectionState if nodeState does not already > contain one > // for the specified id or if the hostname associated with the node > id changed. > nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, > now, > this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > {code} > From above, the {{0.11.0.3}} is just putting the node to the NodeState > HashMap to retry with update host. > In {{1.1.x}}, it adds a logic of "caching". {{if > (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is > swapped/changed, it never gets to the else block to update the NodeState with > the new hostname. > In {{2.2.x}}, it adds an additional check {{if (connectionState != null && > connectionState.host().equals(host))}}, if the Hostname changed, then called > {{nodeState.put()}} to update the host. > So from above, it looks like the 1.1.x caching logic introduced a bug of not > updating the nodeState()'s host when that is changed (e..g host failure, swap > with a different hostname, but use the same broker.id). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id
GEORGE LI created KAFKA-12971: - Summary: Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id Key: KAFKA-12971 URL: https://issues.apache.org/jira/browse/KAFKA-12971 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 1.1.1, 1.1.0, 1.1.2 Reporter: GEORGE LI Fix For: 2.1.2 There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug in 0.11 with too frequent consumer offset commits. Due to the Flink version, it can be directly using latest 2.x kafka-client version. {code} Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: org.apache.kafka.common.errors.DisconnectException. {code} some consumers were stuck with above messages with broker.id 425 had hardware failures and got swapped with a different hostname. Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 0.11.0.3: {code} public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) { nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs, host, clientDnsLookup)); } {code} 1.1.x: {code} public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) { if (nodeState.containsKey(id)) { NodeConnectionState connectionState = nodeState.get(id); connectionState.lastConnectAttemptMs = now; connectionState.state = ConnectionState.CONNECTING; // Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved connectionState.moveToNextAddress(); } else { nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs, host, clientDnsLookup)); } } {code} 2.2.x: {code} public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) { NodeConnectionState connectionState = nodeState.get(id); if (connectionState != null && connectionState.host().equals(host)) { connectionState.lastConnectAttemptMs = now; connectionState.state = ConnectionState.CONNECTING; // Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved connectionState.moveToNextAddress(); return; } else if (connectionState != null) { log.info("Hostname for node {} changed from {} to {}.", id, connectionState.host(), host); } // Create a new NodeConnectionState if nodeState does not already contain one // for the specified id or if the hostname associated with the node id changed. nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs, host, clientDnsLookup)); } {code} >From above, the {{0.11.0.3}} is just putting the node to the NodeState HashMap >to retry with update host. In {{1.1.x}}, it adds a logic of "caching". {{if (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is swapped/changed, it never gets to the else block to update the NodeState with the new hostname. In {{2.2.x}}, it adds an additional check {{if (connectionState != null && connectionState.host().equals(host))}}, if the Hostname changed, then called {{nodeState.put()}} to update the host. So from above, it looks like the 1.1.x caching logic introduced a bug of not updating the nodeState()'s host when that is changed (e..g host failure, swap with a different hostname, but use the same broker.id). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.
[ https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17220243#comment-17220243 ] GEORGE LI commented on KAFKA-8733: -- for lossy clusters, setting unclean.leader.election.enable=true will help. we are also rolling out replica.lag.time.max.ms=3 . > Offline partitions occur when leader's disk is slow in reads while responding > to follower fetch requests. > - > > Key: KAFKA-8733 > URL: https://issues.apache.org/jira/browse/KAFKA-8733 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.2, 2.4.0 >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Critical > Attachments: weighted-io-time-2.png, wio-time.png > > > We found offline partitions issue multiple times on some of the hosts in our > clusters. After going through the broker logs and hosts’s disk stats, it > looks like this issue occurs whenever the read/write operations take more > time on that disk. In a particular case where read time is more than the > replica.lag.time.max.ms, follower replicas will be out of sync as their > earlier fetch requests are stuck while reading the local log and their fetch > status is not yet updated as mentioned in the below code of `ReplicaManager`. > If there is an issue in reading the data from the log for a duration more > than replica.lag.time.max.ms then all the replicas will be out of sync and > partition becomes offline if min.isr.replicas > 1 and unclean.leader.election > is false. > > {code:java} > def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { > val result = readFromLocalLog( // this call took more than > `replica.lag.time.max.ms` > replicaId = replicaId, > fetchOnlyFromLeader = fetchOnlyFromLeader, > readOnlyCommitted = fetchOnlyCommitted, > fetchMaxBytes = fetchMaxBytes, > hardMaxBytesLimit = hardMaxBytesLimit, > readPartitionInfo = fetchInfos, > quota = quota, > isolationLevel = isolationLevel) > if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // > fetch time gets updated here, but mayBeShrinkIsr should have been already > called and the replica is removed from isr > else result > } > val logReadResults = readFromLog() > {code} > Attached the graphs of disk weighted io time stats when this issue occurred. > I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how > to handle this scenario. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)
[ https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17111858#comment-17111858 ] GEORGE LI commented on KAFKA-8638: -- [~hai_lin] some of the recent activities about KIP-491 is in KAFKA-4084, where I made a patch for version 2.4 (and 1.1) with an installation guide. > Preferred Leader Blacklist (deprioritized list) > --- > > Key: KAFKA-8638 > URL: https://issues.apache.org/jira/browse/KAFKA-8638 > Project: Kafka > Issue Type: Improvement > Components: config, controller, core >Affects Versions: 1.1.1, 2.3.0, 2.2.1 >Reporter: GEORGE LI >Assignee: GEORGE LI >Priority: Major > > Currently, the kafka preferred leader election will pick the broker_id in the > topic/partition replica assignments in a priority order when the broker is in > ISR. The preferred leader is the broker id in the first position of replica. > There are use-cases that, even the first broker in the replica assignment is > in ISR, there is a need for it to be moved to the end of ordering (lowest > priority) when deciding leadership during preferred leader election. > Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred > leader. When preferred leadership is run, it will pick 1 as the leader if > it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, > then pick 3 as the leader. There are use cases that, even 1 is in ISR, we > would like it to be moved to the end of ordering (lowest priority) when > deciding leadership during preferred leader election. Below is a list of use > cases: > * (If broker_id 1 is a swapped failed host and brought up with last segments > or latest offset without historical data (There is another effort on this), > it's better for it to not serve leadership till it's caught-up. > * The cross-data center cluster has AWS instances which have less computing > power than the on-prem bare metal machines. We could put the AWS broker_ids > in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, > without changing the reassignments ordering of the replicas. > * If the broker_id 1 is constantly losing leadership after some time: > "Flapping". we would want to exclude 1 to be a leader unless all other > brokers of this topic/partition are offline. The “Flapping” effect was seen > in the past when 2 or more brokers were bad, when they lost leadership > constantly/quickly, the sets of partition replicas they belong to will see > leadership constantly changing. The ultimate solution is to swap these bad > hosts. But for quick mitigation, we can also put the bad hosts in the > Preferred Leader Blacklist to move the priority of its being elected as > leaders to the lowest. > * If the controller is busy serving an extra load of metadata requests and > other tasks. we would like to put the controller's leaders to other brokers > to lower its CPU load. currently bouncing to lose leadership would not work > for Controller, because after the bounce, the controller fails over to > another broker. > * Avoid bouncing broker in order to lose its leadership: it would be good if > we have a way to specify which broker should be excluded from serving > traffic/leadership (without changing the replica assignment ordering by > reassignments, even though that's quick), and run preferred leader election. > A bouncing broker will cause temporary URP, and sometimes other issues. Also > a bouncing of broker (e.g. broker_id 1) can temporarily lose all its > leadership, but if another broker (e.g. broker_id 2) fails or gets bounced, > some of its leaderships will likely failover to broker_id 1 on a replica with > 3 brokers. If broker_id 1 is in the blacklist, then in such a scenario even > broker_id 2 offline, the 3rd broker can take leadership. > The current work-around of the above is to change the topic/partition's > replica reassignments to move the broker_id 1 from the first position to the > last position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). > This changes the replica reassignments, and we need to keep track of the > original one and restore if things change (e.g. controller fails over to > another broker, the swapped empty broker caught up). That’s a rather tedious > task. > KIP is located at > [KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17103028#comment-17103028 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] hmm...That's weird. auto.leader.rebalance.enable seems to be functioning as it is meant. need to make sure the controller has it set correctly. I wonder what the running config is. could you this: {code} ~/confluent-kafka-go$ git remote -v origin https://github.com/confluentinc/confluent-kafka-go.git (fetch) origin https://github.com/confluentinc/confluent-kafka-go.git (push) ~/confluent-kafka-go$ go run examples/admin_describe_config/admin_describe_config.go :9092 broker |grep auto auto.leader.rebalance.enable = false STATIC_BROKER_CONFIG Read-only:true Sensitive:false auto.create.topics.enable = true STATIC_BROKER_CONFIG Read-only:true Sensitive:false {code} The above auto.leader.rebalance.enable = false is the real/actual config. some configs can be change dynamically while the process is running. just want to make sure. do it for all brokers. Another cause might be some cluster management software running (like cruise control), that might be doing PLE periodically? that will make te current leader = first replica when first replica is in ISR. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086772#comment-17086772 ] GEORGE LI edited comment on KAFKA-4084 at 4/19/20, 7:28 AM: [~blodsbror] I am not very familiar with 5.4 setup. Do you have the error message of the crash in the log? is it missing the zkclient jar like below? {code} $ ls -l zk*.jar -rw-r--r-- 1 georgeli engineering 74589 Nov 18 18:21 zkclient-0.11.jar $ jar tvf zkclient-0.11.jar 0 Mon Nov 18 18:11:58 UTC 2019 META-INF/ 1135 Mon Nov 18 18:11:58 UTC 2019 META-INF/MANIFEST.MF 0 Mon Nov 18 18:11:58 UTC 2019 org/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ 3486 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ContentWatcher.class 263 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/DataUpdater.class {code} If this jar file was there before, please copy it back. I need to find out why it was missing after the build. maybe some dependency setup in gradle. I have also update the [install doc |https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit] using `./gradew clean build -x test` Also make sure the startup script for kafka is not hard coding 5.4 jars, but take the jars from the lib classpath? e.g. {code} /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Dlog4j.configuration=file:/etc/kafka/log4j.xml -Xms22G -Xmx22G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:NewSize=16G -XX:MaxNewSize=16G -XX:InitiatingHeapOccupancyPercent=3 -XX:G1MixedGCCountTarget=1 -XX:G1HeapWastePercent=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -verbose:gc -Xloggc:/var/log/kafka/gc-kafka.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=29010 -Djava.rmi.server.hostname=kafka12345-dca4 -cp '.:/usr/share/kafka/lib/*' kafka.Kafka /etc/kafka/server.properties {code} If you give us more details, we can help more. Thanks Actually, I just patched and added back zkclient libs for the gradle build. Please "git clone https://github.com/sql888/kafka.git; (or git pull) and try to build again. I suspect that was the issue. Otherwise, we need to see the errors of the crash from the kafka logs. was (Author: sql_consulting): [~blodsbror] I am not very familiar with 5.4 setup. Do you have the error message of the crash in the log? is it missing the zkclient jar like below? {code} $ ls -l zk*.jar -rw-r--r-- 1 georgeli engineering 74589 Nov 18 18:21 zkclient-0.11.jar $ jar tvf zkclient-0.11.jar 0 Mon Nov 18 18:11:58 UTC 2019 META-INF/ 1135 Mon Nov 18 18:11:58 UTC 2019 META-INF/MANIFEST.MF 0 Mon Nov 18 18:11:58 UTC 2019 org/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ 3486 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ContentWatcher.class 263 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/DataUpdater.class {code} If this jar file was there before, please copy it back. I need to find out why it was missing after the build. maybe some dependency setup in gradle. I have also update the [install doc |https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit] using `./gradew clean build -x test` Also make sure the startup script for kafka is not hard coding 5.4 jars, but take the jars from the lib classpath? e.g. {code} /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Dlog4j.configuration=file:/etc/kafka/log4j.xml -Xms22G -Xmx22G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:NewSize=16G -XX:MaxNewSize=16G -XX:InitiatingHeapOccupancyPercent=3 -XX:G1MixedGCCountTarget=1 -XX:G1HeapWastePercent=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -verbose:gc -Xloggc:/var/log/kafka/gc-kafka.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=29010 -Djava.rmi.server.hostname=kafka12345-dca4 -cp '.:/usr/share/kafka/lib/*' kafka.Kafka /etc/kafka/server.properties {code} If you give us more details, we can help more. Thanks Actually, I just patched and added back zkclient libs for the gradle build. Please "git clone https://github.com/sql888/kafka.git; and try to build again. I suspect that was the issue. Otherwise, we need to see the errors of the crash from the kafka logs. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug >
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086772#comment-17086772 ] GEORGE LI edited comment on KAFKA-4084 at 4/19/20, 7:20 AM: [~blodsbror] I am not very familiar with 5.4 setup. Do you have the error message of the crash in the log? is it missing the zkclient jar like below? {code} $ ls -l zk*.jar -rw-r--r-- 1 georgeli engineering 74589 Nov 18 18:21 zkclient-0.11.jar $ jar tvf zkclient-0.11.jar 0 Mon Nov 18 18:11:58 UTC 2019 META-INF/ 1135 Mon Nov 18 18:11:58 UTC 2019 META-INF/MANIFEST.MF 0 Mon Nov 18 18:11:58 UTC 2019 org/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ 3486 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ContentWatcher.class 263 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/DataUpdater.class {code} If this jar file was there before, please copy it back. I need to find out why it was missing after the build. maybe some dependency setup in gradle. I have also update the [install doc |https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit] using `./gradew clean build -x test` Also make sure the startup script for kafka is not hard coding 5.4 jars, but take the jars from the lib classpath? e.g. {code} /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Dlog4j.configuration=file:/etc/kafka/log4j.xml -Xms22G -Xmx22G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:NewSize=16G -XX:MaxNewSize=16G -XX:InitiatingHeapOccupancyPercent=3 -XX:G1MixedGCCountTarget=1 -XX:G1HeapWastePercent=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -verbose:gc -Xloggc:/var/log/kafka/gc-kafka.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=29010 -Djava.rmi.server.hostname=kafka12345-dca4 -cp '.:/usr/share/kafka/lib/*' kafka.Kafka /etc/kafka/server.properties {code} If you give us more details, we can help more. Thanks Actually, I just patched and added back zkclient libs for the gradle build. Please "git clone https://github.com/sql888/kafka.git; and try to build again. I suspect that was the issue. Otherwise, we need to see the errors of the crash from the kafka logs. was (Author: sql_consulting): [~blodsbror] I am not very familiar with 5.4 setup. Do you have the error message of the crash in the log? is it missing the zkclient jar like below? {code} $ ls -l zk*.jar -rw-r--r-- 1 georgeli engineering 74589 Nov 18 18:21 zkclient-0.11.jar $ jar tvf zkclient-0.11.jar 0 Mon Nov 18 18:11:58 UTC 2019 META-INF/ 1135 Mon Nov 18 18:11:58 UTC 2019 META-INF/MANIFEST.MF 0 Mon Nov 18 18:11:58 UTC 2019 org/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ 3486 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ContentWatcher.class 263 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/DataUpdater.class {code} If this jar file was there before, please copy it back. I need to find out why it was missing after the build. maybe some dependency setup in gradle. I have also update the [install doc |https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit] using `./gradew clean build -x test` Also make sure the startup script for kafka is not hard coding 5.4 jars, but take the jars from the lib classpath? e.g. {code} /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Dlog4j.configuration=file:/etc/kafka/log4j.xml -Xms22G -Xmx22G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:NewSize=16G -XX:MaxNewSize=16G -XX:InitiatingHeapOccupancyPercent=3 -XX:G1MixedGCCountTarget=1 -XX:G1HeapWastePercent=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -verbose:gc -Xloggc:/var/log/kafka/gc-kafka.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=29010 -Djava.rmi.server.hostname=kafka12345-dca4 -cp '.:/usr/share/kafka/lib/*' kafka.Kafka /etc/kafka/server.properties {code} If you give us more details, we can help more. Thanks > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}}
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086772#comment-17086772 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] I am not very familiar with 5.4 setup. Do you have the error message of the crash in the log? is it missing the zkclient jar like below? {code} $ ls -l zk*.jar -rw-r--r-- 1 georgeli engineering 74589 Nov 18 18:21 zkclient-0.11.jar $ jar tvf zkclient-0.11.jar 0 Mon Nov 18 18:11:58 UTC 2019 META-INF/ 1135 Mon Nov 18 18:11:58 UTC 2019 META-INF/MANIFEST.MF 0 Mon Nov 18 18:11:58 UTC 2019 org/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ 3486 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ContentWatcher.class 263 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/DataUpdater.class {code} If this jar file was there before, please copy it back. I need to find out why it was missing after the build. maybe some dependency setup in gradle. I have also update the [install doc |https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit] using `./gradew clean build -x test` Also make sure the startup script for kafka is not hard coding 5.4 jars, but take the jars from the lib classpath? e.g. {code} /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Dlog4j.configuration=file:/etc/kafka/log4j.xml -Xms22G -Xmx22G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:NewSize=16G -XX:MaxNewSize=16G -XX:InitiatingHeapOccupancyPercent=3 -XX:G1MixedGCCountTarget=1 -XX:G1HeapWastePercent=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -verbose:gc -Xloggc:/var/log/kafka/gc-kafka.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=29010 -Djava.rmi.server.hostname=kafka12345-dca4 -cp '.:/usr/share/kafka/lib/*' kafka.Kafka /etc/kafka/server.properties {code} If you give us more details, we can help more. Thanks > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077958#comment-17077958 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] Probably doing PLE with too many partitions at once is not good. We have scripted to take all partition with Preferred Leader Imbalance. (e.g. current leader != first replica). and the first replica is in ISR. Then we divide it batches (e.g. 100 partitions per batch. and throttle sleep about 5-10 seconds) between each batch. We also verify each batch after submitting for PLE. e.g. the ZK node. //admin/preferred_replica_election is gone. for KIP-491 patch, maybe I should write a wrapper for doing PLE, because now the logic is not just. current_leader != first replica. but: current_leader != The batch logic is basically writing the topic/partitions into a Json file (e.g. 100 per batch), and the submit that batch using the open source script `kafka-preferred-replica-election.sh` , below is shell script to do PLE for one topic (all partitions). It's still using ZK to submit the json, can change to --bootstrap-server {code} $ cat topic_preferred_leader_election.sh . name=$1 topic=$2 kafka_cluster_name="${name}" zk=$(kafka_zk_lookup ${kafka_cluster_name}) json_filename="${name}_${topic}_leader_election.json" touch ${json_filename} echo "{\"partitions\":[" >${json_filename} IFS=$'\n' for partition in `/usr/lib/kafka/bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper $zk --describe --topic $topic 2>/dev/null |grep Partition:|awk -F "Partition:" '{print $2}'|awk '{print $1}'` do if [ "$partition" == "0" ] then echo " {\"topic\": \"${topic}\", \"partition\": ${partition}}" >>${json_filename} else echo ",{\"topic\": \"${topic}\", \"partition\": ${partition}}" >>${json_filename} fi done echo "]}" >>${json_filename} /usr/lib/kafka/bin/kafka-preferred-replica-election.sh --zookeeper $zk --path-to-json-file ${json_filename} 2>/dev/null #rm ${json_filename} {code} for the troubleshooting of the timeout, maybe check the ZK node: //admin/preferred_replica_election and see any pending PLE there. maybe because of the KIP-491 Preferred Leader deprioritized/black list? I doubt, because I have tested it worked. does this PLE work before applying KIP-491 patch? I think Zookeeper node has size limit of 1MB. so 5000-6000 partitions doing PLE all together in one batch might not work. How about trying one topic first, then try 100 in a batch? > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17076124#comment-17076124 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] If this turns out to be positive in the testing. I can restart the discussion on the dev mailing list for KIP-491.at least it works/helps with auto.leader.rebalance.enable=true. There are other use cases listed in KIP-491. e.g. when controller is busy with metadata request, can set this dynamic config for the controller, run PLE, and controller will give up all its leadership, just as a follower, CPU usage down. 10-15%, making it light-weighted doing its work, no need to bounce the controller. I know some company is working on the feature of separating the controller to another set of machines. Our primary use case of this `leader.deprioritized.list=` feature is bundled together with another feature call replica.start.offlet.strategy=latest , which I have not filed for a KIP , (default is earliest like current kafka behavior), this is also a dynamic config. can be set for broker level (or global cluster). What it does is when a broker failed and lost all its local disk, and replaced with an empty broker, the empty broker will need to start replication from earliest offset by default, for us, this could be 20TB+ of data for a few hours and can cause outages if not throttled properly. So just like the kafka consumer, we introduce dynamic config replica.start.offlet.strategy=latest , to just replicate from each partition leader's latest offset. Once it's caught up (URP=> 0 for this broker) usually in 5-10minutes or sooner, then remove the dynamic config, Because this broker does not have all the historical data, it should not be serving leaderships. That's how the KIP-491. `leader.deprioritized.list=` is coming into play.The automation software will calculate the retention time at the broker and topic level, take the Max, and once the broker is in replication for that amount of time (e.g. 6 hours, 1 day, 3days, whatever,), the automation software will remove the leader.deprioritized.list dynamic config for the broker. and run PLE to change the leadership back to it. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17076089#comment-17076089 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] I think it's still ok to set auto.leader.rebalance.enable=true. but for the broker that had failed and replaced coming up empty. There should be some kind of automation that first set the `leader.deprioritized.list=` dynamic config at '' cluster global level, so current controller and possible another failover controller can make it in effect immediately. Then start the new replaced broker. During the time the broker is busy catching up. Because it's in the lower priority for being considered for leaders, the auto.leader.rebalance.enable=true will be sort of disabled automatically for this broker. After this broker catches up. e.g. URP => 0, CPU/Disk, etc. back to normal. the dynamic config above can be removed by the automation script. and with auto.leader.rebalance.enable=true, the leaders will be auto going to its Preferred leader (first/head of the partition assignment) of this broker. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17076006#comment-17076006 ] GEORGE LI edited comment on KAFKA-4084 at 4/6/20, 7:13 AM: --- [~blodsbror] Have some free time this weekend to troubleshoot and found out after 1.1 , at least in 2.4, the controller code has some optimization for PLE, not running PLE at all if current_leader == Head of Replica. That had cause my unit/integration tests to fail. I have patched that as well. I have landed my code change to my repo's feature branch. 2.4-leader-deprioritized-list (based on the 2.4 branch) detail installation and testing steps in this [Google doc|https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit]. Please let me know if you have issues with the patch/testing. If can not view the doc, please click the request access button. or send me your email to add to the share. my email: sqlconsult...@gmail.com Please keep us posted with your testing results. Thanks, George was (Author: sql_consulting): [~blodsbror] Have some free time this weekend to troubleshoot and found out after 1.1 , at least in 2.4, the controller code has some optimization for PLE, not running PLE at all if current_leader == Head of Replica. That had cause my unit/integration tests to fail. I have patched that as well. I have landed my code change to my repo's feature branch. 2.4-leader-deprioritized-list (based on the 2.4 branch) detail installation and testing steps in this [Google doc|https://docs.google.com/document/d/1ZuOcYTSuCAqCut_hjI_EY3lA9W7BuIlHVUdOUcSpWww/edit]. Please let me know if you have issues with the patch/testing. If can not view the doc, please click the request access button. or send me your email to add to the share. my email: sqlconsult...@gmail.com Please keep us posted with your testing results. Thanks, George > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17076006#comment-17076006 ] GEORGE LI edited comment on KAFKA-4084 at 4/6/20, 2:26 AM: --- [~blodsbror] Have some free time this weekend to troubleshoot and found out after 1.1 , at least in 2.4, the controller code has some optimization for PLE, not running PLE at all if current_leader == Head of Replica. That had cause my unit/integration tests to fail. I have patched that as well. I have landed my code change to my repo's feature branch. 2.4-leader-deprioritized-list (based on the 2.4 branch) detail installation and testing steps in this [Google doc|https://docs.google.com/document/d/1ZuOcYTSuCAqCut_hjI_EY3lA9W7BuIlHVUdOUcSpWww/edit]. Please let me know if you have issues with the patch/testing. If can not view the doc, please click the request access button. or send me your email to add to the share. my email: sqlconsult...@gmail.com Please keep us posted with your testing results. Thanks, George was (Author: sql_consulting): [~blodsbror] Have some free time this weekend to troubleshoot and found out after 1.1 , at least in 2.4, the controller code has some optimization for PLE, not running PLE at all if current_leader == Head of Replica. That had cause my unit/integration tests to fail. I have patched that as well. I have landed my code change to my repo's feature branch. 2.4-leader-deprioritized-list (based on the 2.4 branch) detail installation and testing steps in this [Google doc|https://docs.google.com/document/d/1ZuOcYTSuCAqCut_hjI_EY3lA9W7BuIlHVUdOUcSpWww/edit]. Please let me know if you have issues with the patch/testing. If can not view the doc, please click the request access button. or send me your email to add to the share. my email: sqlconsult...@gmail.com Please keep us posted with your testing results. Thanks, George If you > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17076006#comment-17076006 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] Have some free time this weekend to troubleshoot and found out after 1.1 , at least in 2.4, the controller code has some optimization for PLE, not running PLE at all if current_leader == Head of Replica. That had cause my unit/integration tests to fail. I have patched that as well. I have landed my code change to my repo's feature branch. 2.4-leader-deprioritized-list (based on the 2.4 branch) detail installation and testing steps in this [Google doc|https://docs.google.com/document/d/1ZuOcYTSuCAqCut_hjI_EY3lA9W7BuIlHVUdOUcSpWww/edit]. Please let me know if you have issues with the patch/testing. If can not view the doc, please click the request access button. or send me your email to add to the share. my email: sqlconsult...@gmail.com Please keep us posted with your testing results. Thanks, George If you > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072259#comment-17072259 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] Very busy with work and this COVID-19 Jira work-from-home ticket. :) Last time I worked on it I had some issues with some Unit Tests passing after porting from 1.1.x to 2.4. Let me look into them again. sorry for the delay. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17051509#comment-17051509 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] sure. I am working on porting the patch from 1.1.x. to 2.4. Looks like some changes of separating PartitionStateMachine.scala to Election.scala. I will need some time to modify that and corresponding unit/integration/system tests. will post the patch once ready > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17048116#comment-17048116 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] [~sriharsha] Hi Evan, I am so sorry I had been busy with work and forget about providing the KIP-491 patch for Confluent 5.4. just check the backlog emails. Let me work on it this weekend. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032598#comment-17032598 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] Which kafka version you are running? Maybe I can provide a diff for the KIP-491 changes for you to apply at your end to try it out? > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032267#comment-17032267 ] GEORGE LI edited comment on KAFKA-4084 at 2/7/20 10:02 AM: --- [~blodsbror] [~junrao] [~sriharsha] With `auto.leader.rebalance.enable=true`, even with checking the broker is in ISR, then rebalance the leadership to this broker. It might still impact the partition that this broker is serving leadership because the broker is still trying to catch up. [~blodsbror] Is your cluster having lossless setting like min.insync.replicas > 1 ? For lossless, we experience high producer latency during catchup or reassignments. The leader deprioritized list feature is convenient in this case. it will just put this broker catching up in the lowest priority when considering being the leader. Another useful case is when the current controlller is very busy with metadata request, "blacklist" it, and only serving as followers can give 10-15% CPU back to the controller (without bouncing it). In a cluster without any down brokers, no URP (Under Replicated Partitions), there is a workaround to run reassignments to move that broker to the end of the partition assignment, e.g. broker_id 100 is down.then partition assignment (100, 101, 102) => (101, 102, 100). the reassignment should complete fast because all replicas in ISR. then run preferred leader election will change the leader from 100 => 101. The downside is: its more work to rollback or rebalance again.KIP-491 is very easy to rollback, just unset the dynamic config. In the case of the cluster with URP, the reassignment approach to move the broker to the end of the assignment might not work, because the broker is not in ISR. the reassignments will be pending till it catches up and in ISR. So maybe we do have a good use-case for KIP-491 ? was (Author: sql_consulting): [~blodsbror] [~junrao][~sriharsha] With `auto.leader.rebalance.enable=true`, even with checking the broker is in ISR, then rebalance the leadership to this broker. It might still impact the partition that this broker is serving leadership because the broker is still trying to catch up. [~blodsbror] is your cluster having lossless setting like min.insync.replicas > 1 ? For lossless, we experience high producer latency during catchup or reassignments. The leader deprioritized list feature is convenient in this case. it will just put this broker catching up in the lowest priority when considering being the leader. Another useful case is when the current controlller is very busy with metadata request, "blacklist" it, and only serving as followers can give 10-15% CPU back to the controller (without bouncing it). In a cluster without any down brokers, no URP (Under Replicated Partitions), there is a workaround to run reassignments to move that broker to the end of the partition assignment, e.g. broker_id 100 is down.then partition assignment (100, 101, 102) => (101, 102, 100). the reassignment should complete fast because all replicas in ISR. then run preferred leader election will change the leader from 100 => 101. The downside is: its more work to rollback or rebalance again.KIP-491 is very easy to rollback, just unset the dynamic config. In the case of the cluster with URP, the reassignment approach to move the broker to the end of the assignment might not work, because the broker is not in ISR. the reassignments will be pending till it catches up and in ISR. So maybe we do have a good use-case for KIP-491 ? > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk.
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032267#comment-17032267 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] [~junrao][~sriharsha] With `auto.leader.rebalance.enable=true`, even with checking the broker is in ISR, then rebalance the leadership to this broker. It might still impact the partition that this broker is serving leadership because the broker is still trying to catch up. [~blodsbror] is your cluster having lossless setting like min.insync.replicas > 1 ? For lossless, we experience high producer latency during catchup or reassignments. The leader deprioritized list feature is convenient in this case. it will just put this broker catching up in the lowest priority when considering being the leader. Another useful case is when the current controlller is very busy with metadata request, "blacklist" it, and only serving as followers can give 10-15% CPU back to the controller (without bouncing it). In a cluster without any down brokers, no URP (Under Replicated Partitions), there is a workaround to run reassignments to move that broker to the end of the partition assignment, e.g. broker_id 100 is down.then partition assignment (100, 101, 102) => (101, 102, 100). the reassignment should complete fast because all replicas in ISR. then run preferred leader election will change the leader from 100 => 101. The downside is: its more work to rollback or rebalance again.KIP-491 is very easy to rollback, just unset the dynamic config. In the case of the cluster with URP, the reassignment approach to move the broker to the end of the assignment might not work, because the broker is not in ISR. the reassignments will be pending till it catches up and in ISR. So maybe we do have a good use-case for KIP-491 ? > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17030183#comment-17030183 ] GEORGE LI commented on KAFKA-4084: -- [~evanjpw] One solution we have in-house is a tooling to do "offline rebuild" of the failed empty broker. The idea is to keep the kafka on the new empty broker down. not serving any traffic. and copy the log segments/indexes files from other brokers in the cluster that this broker is supposed to be hosting. distribute and throttle the copy so it would not affect the brokers serving traffic in production. Once the copy is done, modify the replication offset meta file, start up the kafka on the new node, since there are historical logs already copied, the "delta" catch-up from leaders will be small, and it will be faster to catch up. This is just the main idea. It is an internal tooling that is tied to our infrastructure. Hope this helps. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029299#comment-17029299 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] We have implemented KIP-491 internally. Using a dynamic config leader.deprioritized.list to set it for the cluster global level ('') so in case of controller failover, the new controller will inherit this dynamic config settings. {code} bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config leader.deprioritized.list=10001 $ zkcli -h zk_host1 get /kafka_cluster_name/config/brokers/'' ('{"version":1,"config":{"leader.deprioritized.list":"10001"}}', ZnodeStat(czxid=25790129667, mzxid=25791108491, ctime=1572677228451, mtime=1580117760492, version=21, cversion=0, aversion=0, ephemeralOwner=0, dataLength=59, numChildren=0, pzxid=25790129667)) {code} This will put the broker 10001 to the lowest priority when controller is considering leadership for that partition, regardless this broker is in the 1st position of the assignment (namely : preferred leader), if this is currently serving leadership, the preferred leader election will move it to another broker in the ISR. We have also implemented another feature separate from KIP-491 that when an empty broker is starting up, a dynamic config for that broker called "{{replica.start.offset.strategy}}" is set to "latest" (default is "earliest" like current upstream behavior), just like a consumer, it will fetch from current leaders' latest offsets instead of earliest(start) offset. So this makes the failed empty broker coming up very fast.This feature is used together with KIP-491 {{leader.deprioritized.list}} to blacklist this broker to serve traffic (because it does not have enough data). After it's in replication for sometime (retention of the broker/topic level), this broker is completely caught-up, and the {{leader.deprioritized.list}} is removed. and when preferred leader is run, this broker can serve traffic again. We haven't proposed this in any KIP yet. But I think this is also a good features. maybe I will restart the KIP-491 discussion in the dev mailing list. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029299#comment-17029299 ] GEORGE LI edited comment on KAFKA-4084 at 2/3/20 9:26 PM: -- [~blodsbror] We have implemented KIP-491 internally. Using a dynamic config leader.deprioritized.list to set it for the cluster global level ('') so in case of controller failover, the new controller will inherit this dynamic config settings. {code} bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config leader.deprioritized.list=10001 $ zkcli -h zk_host1 get /kafka_cluster_name/config/brokers/'' ('{"version":1,"config":{"leader.deprioritized.list":"10001"}}', ZnodeStat(czxid=25790129667, mzxid=25791108491, ctime=1572677228451, mtime=1580117760492, version=21, cversion=0, aversion=0, ephemeralOwner=0, dataLength=59, numChildren=0, pzxid=25790129667)) {code} This will put the broker 10001 to the lowest priority when controller is considering leadership for that partition, regardless this broker is in the 1st position of the assignment (namely : preferred leader), if this is currently serving leadership, the preferred leader election will move it to another broker in the ISR. We have also implemented another feature separate from KIP-491 that when an empty broker is starting up, a dynamic config for that broker called "{{replica.start.offset.strategy}}" is set to "latest" (default is "earliest" like current upstream behavior), just like a consumer, it will fetch from current leaders' latest offsets instead of earliest(start) offset. So this makes the failed empty broker coming up very fast.This feature is used together with KIP-491 {{leader.deprioritized.list}} to blacklist this broker to serve traffic (because it does not have enough data). After it's in replication for sometime (retention of the broker/topic level), this broker is completely caught-up, and the {{leader.deprioritized.list}} is removed. and when preferred leader is run, this broker can serve traffic again. We haven't proposed this in any KIP yet. But I think this is also a good feature. maybe I will restart the KIP-491 discussion in the dev mailing list. was (Author: sql_consulting): [~blodsbror] We have implemented KIP-491 internally. Using a dynamic config leader.deprioritized.list to set it for the cluster global level ('') so in case of controller failover, the new controller will inherit this dynamic config settings. {code} bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config leader.deprioritized.list=10001 $ zkcli -h zk_host1 get /kafka_cluster_name/config/brokers/'' ('{"version":1,"config":{"leader.deprioritized.list":"10001"}}', ZnodeStat(czxid=25790129667, mzxid=25791108491, ctime=1572677228451, mtime=1580117760492, version=21, cversion=0, aversion=0, ephemeralOwner=0, dataLength=59, numChildren=0, pzxid=25790129667)) {code} This will put the broker 10001 to the lowest priority when controller is considering leadership for that partition, regardless this broker is in the 1st position of the assignment (namely : preferred leader), if this is currently serving leadership, the preferred leader election will move it to another broker in the ISR. We have also implemented another feature separate from KIP-491 that when an empty broker is starting up, a dynamic config for that broker called "{{replica.start.offset.strategy}}" is set to "latest" (default is "earliest" like current upstream behavior), just like a consumer, it will fetch from current leaders' latest offsets instead of earliest(start) offset. So this makes the failed empty broker coming up very fast.This feature is used together with KIP-491 {{leader.deprioritized.list}} to blacklist this broker to serve traffic (because it does not have enough data). After it's in replication for sometime (retention of the broker/topic level), this broker is completely caught-up, and the {{leader.deprioritized.list}} is removed. and when preferred leader is run, this broker can serve traffic again. We haven't proposed this in any KIP yet. But I think this is also a good features. maybe I will restart the KIP-491 discussion in the dev mailing list. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels:
[jira] [Updated] (KAFKA-8903) allow the new replica (offset 0) to catch up with current leader using latest offset
[ https://issues.apache.org/jira/browse/KAFKA-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] GEORGE LI updated KAFKA-8903: - Description: It very common (and sometimes frequent) that a broker has hardware failures (disk, memory, cpu, nic) for large Kafka deployment with thousands of brokers. The failed host will be replaced by a new one with the same "broker.id", and the new broker starts up as empty. All topic/partitions will start with offset 0. If the current leader has start offset > 0, this replaced broker will start the partition from the leader's earliest (start) offset. If the number of partitions and size of the partitions that this broker is hosting is high, it would take quite sometime for the ReplicaFetcher threads to pull from all the leaders in the cluster. and it could incur load of the brokers/leaders in the cluster affecting Latency, etc. performance. Once this replaced broker is caught up, Preferred leader election can be run to move the leaders back to this broker. To avoid above performance impact and make the failed broker replacement process much easier and scalable, we are proposing a new Dynamic config _replica.start.offset.strategy_. The default is Earliest, and can be dynamically set for a broker (or brokers) to Latest. If it's set to Latest, when the empty broker is starting up, all partitions will be starting from latest (LEO LogEndOffset) of the current leader. So the replace broker replicas are in ISR and have 0 TotalLag/MaxLag, 0 URP almost instantly. For preferred leadership election, we can wait till the retention time has passed, and this replaced broker is in the replication for enough time. The better/safer approach is enable Preferred Leader Blacklist mentioned in KAFKA-8638 / [KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982] , so before this replaced broker is completely caught up, it's leadership determination priority is moved to the lowest. was: It very common (and sometimes frequent) that a broker has hardware failures (disk, memory, cpu, nic) for large Kafka deployment with thousands of brokers. The failed host will be replaced by a new one with the same "broker.id", and the new broker starts up as empty. All topic/partitions will start with offset 0. If the current leader has start offset > 0, this replaced broker will start the partition from the leader's earliest (start) offset. If the number of partitions and size of the partitions that this broker is hosting is high, it would take quite sometime for the ReplicaFetcher threads to pull from all the leaders in the cluster. and it could incur load of the brokers/leaders in the cluster affecting Latency, etc. performance. Once this replaced broker is caught up, Preferred leader election can be run to move the leaders back to this broker. To avoid above performance impact and make the failed broker replacement process much easier and scalable, we are proposing a new Dynamic config {{ replica.start.offset.strategy}}. The default is Earliest, and can be dynamically set for a broker (or brokers) to Latest. If it's set to Latest, when the empty broker is starting up, all partitions will be starting from latest (LEO LogEndOffset) of the current leader. So the replace broker replicas are in ISR and have 0 TotalLag/MaxLag, 0 URP almost instantly. For preferred leadership election, we can wait till the retention time has passed, and this replaced broker is in the replication for enough time. The better/safer approach is enable Preferred Leader Blacklist mentioned in KAFKA-8638 / [KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982] , so before this replaced broker is completely caught up, it's leadership determination priority is moved to the lowest. > allow the new replica (offset 0) to catch up with current leader using latest > offset > > > Key: KAFKA-8903 > URL: https://issues.apache.org/jira/browse/KAFKA-8903 > Project: Kafka > Issue Type: Improvement > Components: config, core >Affects Versions: 1.1.0, 1.1.1, 2.3.0 >Reporter: GEORGE LI >Assignee: GEORGE LI >Priority: Major > > It very common (and sometimes frequent) that a broker has hardware failures > (disk, memory, cpu, nic) for large Kafka deployment with thousands of > brokers. The failed host will be replaced by a new one with the same > "broker.id", and the new broker starts up as empty. All topic/partitions > will start with offset 0. If the current leader has start offset > 0, this > replaced broker will start the partition from the leader's earliest (start) > offset. > If the number of partitions and
[jira] [Updated] (KAFKA-8903) allow the new replica (offset 0) to catch up with current leader using latest offset
[ https://issues.apache.org/jira/browse/KAFKA-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] GEORGE LI updated KAFKA-8903: - Description: It very common (and sometimes frequent) that a broker has hardware failures (disk, memory, cpu, nic) for large Kafka deployment with thousands of brokers. The failed host will be replaced by a new one with the same "broker.id", and the new broker starts up as empty. All topic/partitions will start with offset 0. If the current leader has start offset > 0, this replaced broker will start the partition from the leader's earliest (start) offset. If the number of partitions and size of the partitions that this broker is hosting is high, it would take quite sometime for the ReplicaFetcher threads to pull from all the leaders in the cluster. and it could incur load of the brokers/leaders in the cluster affecting Latency, etc. performance. Once this replaced broker is caught up, Preferred leader election can be run to move the leaders back to this broker. To avoid above performance impact and make the failed broker replacement process much easier and scalable, we are proposing a new Dynamic config {{ replica.start.offset.strategy}}. The default is Earliest, and can be dynamically set for a broker (or brokers) to Latest. If it's set to Latest, when the empty broker is starting up, all partitions will be starting from latest (LEO LogEndOffset) of the current leader. So the replace broker replicas are in ISR and have 0 TotalLag/MaxLag, 0 URP almost instantly. For preferred leadership election, we can wait till the retention time has passed, and this replaced broker is in the replication for enough time. The better/safer approach is enable Preferred Leader Blacklist mentioned in KAFKA-8638 / [KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982] , so before this replaced broker is completely caught up, it's leadership determination priority is moved to the lowest. was: It very common (and sometimes frequent) that a broker has hardware failures (disk, memory, cpu, nic) for large Kafka deployment with thousands of brokers. The failed host will be replaced by a new one with the same "broker.id", and the new broker starts up as empty. All topic/partitions will start with offset 0. If the current leader has start offset > 0, this replaced broker will start the partition from the leader's earliest (start) offset. If the number of partitions and size of the partitions that this broker is hosting is high, it would take quite sometime for the ReplicaFetcher threads to pull from all the leaders in the cluster. and it could incur load of the brokers/leaders in the cluster affecting Latency, etc. performance. Once this replaced broker is caught up, Preferred leader election can be run to move the leaders back to this broker. To avoid above performance impact and make the failed broker replacement process much easier and scalable, we are proposing a new Dynamic config {{ replica.start.offset.strategy}}. The default is Earliest, and can be dynamically set for a broker (or brokers) to Latest. If it's set to Latest, when the empty broker is starting up, all partitions will be starting from latest (LEO LogEndOffset) of the current leader. So the replace broker replicas are in ISR and have 0 TotalLag/MaxLag, 0 URP almost instantly. For preferred leadership election, we can wait till the retention time has passed, and this replaced broker is in the replication for enough time. The better/safer approach is enable Preferred Leader Blacklist mentioned in KAFKA-8638 / KIP-491 , so before this replaced broker is completely caught up, it's leadership determination priority is moved to the lowest. > allow the new replica (offset 0) to catch up with current leader using latest > offset > > > Key: KAFKA-8903 > URL: https://issues.apache.org/jira/browse/KAFKA-8903 > Project: Kafka > Issue Type: Improvement > Components: config, core >Affects Versions: 1.1.0, 1.1.1, 2.3.0 >Reporter: GEORGE LI >Assignee: GEORGE LI >Priority: Major > > It very common (and sometimes frequent) that a broker has hardware failures > (disk, memory, cpu, nic) for large Kafka deployment with thousands of > brokers. The failed host will be replaced by a new one with the same > "broker.id", and the new broker starts up as empty. All topic/partitions > will start with offset 0. If the current leader has start offset > 0, this > replaced broker will start the partition from the leader's earliest (start) > offset. > If the number of partitions and size of the partitions that this broker is > hosting is high, it would take
[jira] [Commented] (KAFKA-8903) allow the new replica (offset 0) to catch up with current leader using latest offset
[ https://issues.apache.org/jira/browse/KAFKA-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928906#comment-16928906 ] GEORGE LI commented on KAFKA-8903: -- e.g. {code:java} /usr/lib/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config replica.start.offset.strategy=Latest --entity-type brokers --entity-name 1028 Completed updating config for broker: 1028. [2019-09-08 06:48:34,997] 1307 [/config/changes-event-process-thread] INFO kafka.server.DynamicConfigManager - Processing override for entityPath: brokers/1028 with config: Map(replica.start.offset.strategy -> Latest) .. [2019-09-08 07:34:31,826] 1777 [ReplicaFetcherThread-1-1025] INFO kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=1028, leaderId=1025, fetcherId=1] brokerConfig.ReplicaStartOffsetStrategy: Latest [2019-09-08 07:34:31,826] 1777 [ReplicaFetcherThread-1-1025] INFO kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=1028, leaderId=1025, fetcherId=1] replica.logEndOffset.messageOffset: 0 [2019-09-08 07:34:31,826] 1777 [ReplicaFetcherThread-1-1025] WARN kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=1028, leaderId=1025, fetcherId=1] replica.start.offset.strategy: Latest. Reset fetch offset for partition georgeli_test-0 from 0 to current leader's latest offset 339347 ... [2019-09-08 07:34:31,826] 1777 [ReplicaFetcherThread-1-1025] INFO kafka.log.LogCleaner - The cleaning for partition georgeli_test-0 is aborted and paused [2019-09-08 07:34:31,826] 1777 [ReplicaFetcherThread-1-1025] INFO kafka.log.Log - [Log partition=georgeli_test-0, dir=/var/kafka-spool/data] Scheduling log segment [baseOffset 0, size 0] for deletion. ... [2019-09-08 07:34:31,828] 1779 [ReplicaFetcherThread-1-1025] INFO kafka.log.LogCleaner - Compaction for partition georgeli_test-0 is resumed ... {code} To remove the config and use default "Earliest" leader start offset {code:java} /usr/lib/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config replica.start.offset.strategy --entity-type brokers --entity-name 1028 Completed updating config for broker: 1028. [2019-09-08 23:18:20,581] 468051 [/config/changes-event-process-thread] INFO kafka.common.ZkNodeChangeNotificationListener - Processing notification(s) to /config/changes [2019-09-08 23:18:20,588] 468058 [/config/changes-event-process-thread] INFO kafka.server.DynamicConfigManager - Processing override for entityPath: brokers/1028 with config: Map() [2019-09-08 23:18:20,589] 468059 [/config/changes-event-process-thread] INFO kafka.server.KafkaConfig - KafkaConfig values: advertised.host.name = kafka1028-dc3 ... replica.start.offset.strategy = Earliest ... [2019-09-08 23:23:36,408] 1773 [ReplicaFetcherThread-1-1025] INFO kafka.log.Log - [Log partition=georgeli_test-0, dir=/var/kafka-spool/data] Truncating to 0 has no effect as the largest offset in the log is -1 [2019-09-08 23:23:36,439] 1804 [ReplicaFetcherThread-1-1025] WARN kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=1028, leaderId=1025, fetcherId=1] Reset fetch offset for partition georgeli_test-0 from 0 to current leader's start offset 319246 [2019-09-08 23:23:36,440] 1805 [ReplicaFetcherThread-1-1025] INFO kafka.log.LogCleaner - The cleaning for partition georgeli_test-0 is aborted and paused [2019-09-08 23:23:36,440] 1805 [ReplicaFetcherThread-1-1025] INFO kafka.log.Log - [Log partition=georgeli_test-0, dir=/var/kafka-spool/data] Scheduling log segment [baseOffset 0, size 0] for deletion. [2019-09-08 23:23:36,441] 1806 [ReplicaFetcherThread-1-1025] INFO kafka.log.LogCleaner - Compaction for partition georgeli_test-0 is resumed [2019-09-08 23:23:36,449] 1814 [ReplicaFetcherThread-1-1025] INFO kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=1028, leaderId=1025, fetcherId=1] Current offset 0 for partition georgeli_test-0 is out of range, which typically implies a leader change. Reset fetch offset to 319246 ... {code} >From above, we can see the new config "replica.start.offset.strategy" >Earliest/Latest effect on the empty/new partition's start offset. 319246 >Vs. 339347 > allow the new replica (offset 0) to catch up with current leader using latest > offset > > > Key: KAFKA-8903 > URL: https://issues.apache.org/jira/browse/KAFKA-8903 > Project: Kafka > Issue Type: Improvement > Components: config, core >Affects Versions: 1.1.0, 1.1.1, 2.3.0 >Reporter: GEORGE LI >Assignee: GEORGE LI >Priority: Major > > It very common (and sometimes frequent) that a broker has hardware failures > (disk, memory, cpu, nic) for large Kafka deployment with thousands of > brokers. The failed
[jira] [Created] (KAFKA-8903) allow the new replica (offset 0) to catch up with current leader using latest offset
GEORGE LI created KAFKA-8903: Summary: allow the new replica (offset 0) to catch up with current leader using latest offset Key: KAFKA-8903 URL: https://issues.apache.org/jira/browse/KAFKA-8903 Project: Kafka Issue Type: Improvement Components: config, core Affects Versions: 2.3.0, 1.1.1, 1.1.0 Reporter: GEORGE LI Assignee: GEORGE LI It very common (and sometimes frequent) that a broker has hardware failures (disk, memory, cpu, nic) for large Kafka deployment with thousands of brokers. The failed host will be replaced by a new one with the same "broker.id", and the new broker starts up as empty. All topic/partitions will start with offset 0. If the current leader has start offset > 0, this replaced broker will start the partition from the leader's earliest (start) offset. If the number of partitions and size of the partitions that this broker is hosting is high, it would take quite sometime for the ReplicaFetcher threads to pull from all the leaders in the cluster. and it could incur load of the brokers/leaders in the cluster affecting Latency, etc. performance. Once this replaced broker is caught up, Preferred leader election can be run to move the leaders back to this broker. To avoid above performance impact and make the failed broker replacement process much easier and scalable, we are proposing a new Dynamic config {{ replica.start.offset.strategy}}. The default is Earliest, and can be dynamically set for a broker (or brokers) to Latest. If it's set to Latest, when the empty broker is starting up, all partitions will be starting from latest (LEO LogEndOffset) of the current leader. So the replace broker replicas are in ISR and have 0 TotalLag/MaxLag, 0 URP almost instantly. For preferred leadership election, we can wait till the retention time has passed, and this replaced broker is in the replication for enough time. The better/safer approach is enable Preferred Leader Blacklist mentioned in KAFKA-8638 / KIP-491 , so before this replaced broker is completely caught up, it's leadership determination priority is moved to the lowest. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16900461#comment-16900461 ] GEORGE LI commented on KAFKA-4084: -- [~jiaxinye] does the patch from https://issues.apache.org/jira/browse/KAFKA-7299 help? any idea why step 6 has 100% CPU utilization? is it doing heavy ReplicaFetcher pulling data to follower of this previous offline brokers? If this is true, then [KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982] might help by putting this broker to temporary preferred leader "blacklist", and once it's up and running stable with low CPU usage, then remove from the blacklist and then the auto leader rebalance can proceed. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8725) Improve LogCleaner error handling when failing to grab the filthiest log
[ https://issues.apache.org/jira/browse/KAFKA-8725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16899237#comment-16899237 ] GEORGE LI commented on KAFKA-8725: -- We saw similar issues. After the thread die, the "good" partitions also stops cleaning and accumulated backlog. One more improvement might be to restart the log cleaner thread without bouncing the broker. The current Dynamic config log.cleaner.threads seems to be able to start the thread only one-time. > Improve LogCleaner error handling when failing to grab the filthiest log > > > Key: KAFKA-8725 > URL: https://issues.apache.org/jira/browse/KAFKA-8725 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Major > > https://issues.apache.org/jira/browse/KAFKA-7215 improved error handling in > the log cleaner with the goal of not having the whole thread die when an > exception happens, but rather mark the partition that caused it as > uncleanable and continue cleaning the error-free partitions. > Unfortunately, the current code can still bubble up an exception and cause > the thread to die when an error happens before we can grab the filthiest log > and start cleaning it. At that point, we don't have a clear reference to the > log that caused the exception and chose to throw an IllegalStateException - > [https://github.com/apache/kafka/blob/39bcc8447c906506d63b8df156cf90174bbb8b78/core/src/main/scala/kafka/log/LogCleaner.scala#L346] > (as seen in https://issues.apache.org/jira/browse/KAFKA-8724) > Essentially, exceptions in `grabFilthiestCompactedLog` still cause the thread > to die. This can be further improved by trying to catch what log caused the > exception in the aforementioned function -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8663) partition assignment would be better original_assignment + new_reassignment during reassignments
[ https://issues.apache.org/jira/browse/KAFKA-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] GEORGE LI resolved KAFKA-8663. -- Resolution: Won't Fix looks like RAR + OAR is required for KIP-455 to preserve the targetReplicas exact ordering, and old replicas that need to be dropped is in the new removingReplicas. of /brokers/topics/ zk node. > partition assignment would be better original_assignment + new_reassignment > during reassignments > > > Key: KAFKA-8663 > URL: https://issues.apache.org/jira/browse/KAFKA-8663 > Project: Kafka > Issue Type: Improvement > Components: controller, core >Affects Versions: 1.1.1, 2.3.0 >Reporter: GEORGE LI >Priority: Minor > > From my observation/experience during reassignment, the partition assignment > replica ordering gets changed. because it's OAR + RAR (original replicas > + reassignment replicas) set union. > However, it seems like the preferred leaders changed during the > reassignments. Normally if there is no cluster preferred leader election, > the leader is still the old leader. But if during the reassignments, there > is a leader election, the leadership changes. This caused some side > effects. Let's look at this example. > {code} > Topic:georgeli_test PartitionCount:8ReplicationFactor:3 Configs: > Topic: georgeli_testPartition: 0Leader: 1026Replicas: > 1026,1028,1025Isr: 1026,1028,1025 > {code} > reassignment (1026,1028,1025) => (1027,1025,1028) > {code} > Topic:georgeli_test PartitionCount:8ReplicationFactor:4 > Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027 > Topic: georgeli_testPartition: 0Leader: 1026Replicas: > 1027,1025,1028,1026 Isr: 1026,1028,1025 > {code} > Notice the above: Leader remains 1026. but Replicas: 1027,1025,1028,1026. > If we run preferred leader election, it will try 1027 first, then 1025. > After 1027 is in ISR, then the final assignment will be (1027,1025,1028). > > My proposal for a minor improvement is to keep the original ordering replicas > during the reassignment (could be long for big topic/partitions). and after > all replicas in ISR, then finally set the partition assignment to New > reassignment. > {code} > val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ > controllerContext.partitionReplicaAssignment(topicPartition)).toSet > //1. Update AR in ZK with OAR + RAR. > updateAssignedReplicasForPartition(topicPartition, > newAndOldReplicas.toSeq) > {code} > above code changed to below to keep the original ordering first during > reassignment: > {code} > val newAndOldReplicas = > (controllerContext.partitionReplicaAssignment(topicPartition) ++ > reassignedPartitionContext.newReplicas).toSet > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)
[ https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16884787#comment-16884787 ] GEORGE LI commented on KAFKA-8638: -- Here is the KIP: [KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982] > Preferred Leader Blacklist (deprioritized list) > --- > > Key: KAFKA-8638 > URL: https://issues.apache.org/jira/browse/KAFKA-8638 > Project: Kafka > Issue Type: Improvement > Components: config, controller, core >Affects Versions: 1.1.1, 2.3.0, 2.2.1 >Reporter: GEORGE LI >Assignee: GEORGE LI >Priority: Major > > Currently, the kafka preferred leader election will pick the broker_id in the > topic/partition replica assignments in a priority order when the broker is in > ISR. The preferred leader is the broker id in the first position of replica. > There are use-cases that, even the first broker in the replica assignment is > in ISR, there is a need for it to be moved to the end of ordering (lowest > priority) when deciding leadership during preferred leader election. > Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred > leader. When preferred leadership is run, it will pick 1 as the leader if > it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, > then pick 3 as the leader. There are use cases that, even 1 is in ISR, we > would like it to be moved to the end of ordering (lowest priority) when > deciding leadership during preferred leader election. Below is a list of > use cases: > * (If broker_id 1 is a swapped failed host and brought up with last segments > or latest offset without historical data (There is another effort on this), > it's better for it to not serve leadership till it's caught-up. > * The cross-data center cluster has AWS instances which have less computing > power than the on-prem bare metal machines. We could put the AWS broker_ids > in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, > without changing the reassignments ordering of the replicas. > * If the broker_id 1 is constantly losing leadership after some time: > "Flapping". we would want to exclude 1 to be a leader unless all other > brokers of this topic/partition are offline. The “Flapping” effect was seen > in the past when 2 or more brokers were bad, when they lost leadership > constantly/quickly, the sets of partition replicas they belong to will see > leadership constantly changing. The ultimate solution is to swap these bad > hosts. But for quick mitigation, we can also put the bad hosts in the > Preferred Leader Blacklist to move the priority of its being elected as > leaders to the lowest. > * If the controller is busy serving an extra load of metadata requests and > other tasks. we would like to put the controller's leaders to other brokers > to lower its CPU load. currently bouncing to lose leadership would not work > for Controller, because after the bounce, the controller fails over to > another broker. > * Avoid bouncing broker in order to lose its leadership: it would be good if > we have a way to specify which broker should be excluded from serving > traffic/leadership (without changing the replica assignment ordering by > reassignments, even though that's quick), and run preferred leader election. > A bouncing broker will cause temporary URP, and sometimes other issues. Also > a bouncing of broker (e.g. broker_id 1) can temporarily lose all its > leadership, but if another broker (e.g. broker_id 2) fails or gets bounced, > some of its leaderships will likely failover to broker_id 1 on a replica with > 3 brokers. If broker_id 1 is in the blacklist, then in such a scenario even > broker_id 2 offline, the 3rd broker can take leadership. > The current work-around of the above is to change the topic/partition's > replica reassignments to move the broker_id 1 from the first position to the > last position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). > This changes the replica reassignments, and we need to keep track of the > original one and restore if things change (e.g. controller fails over to > another broker, the swapped empty broker caught up). That’s a rather tedious > task. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8663) partition assignment would be better original_assignment + new_reassignment during reassignments
[ https://issues.apache.org/jira/browse/KAFKA-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16884526#comment-16884526 ] GEORGE LI commented on KAFKA-8663: -- As we can see from the original comments of the code: {code} //1. Update AR in ZK with OAR + RAR. {code} But in the actual implementation, it's doing: RAR + OAR instead (different ordering). > partition assignment would be better original_assignment + new_reassignment > during reassignments > > > Key: KAFKA-8663 > URL: https://issues.apache.org/jira/browse/KAFKA-8663 > Project: Kafka > Issue Type: Improvement > Components: controller, core >Affects Versions: 1.1.1, 2.3.0 >Reporter: GEORGE LI >Priority: Minor > > From my observation/experience during reassignment, the partition assignment > replica ordering gets changed. because it's OAR + RAR (original replicas > + reassignment replicas) set union. > However, it seems like the preferred leaders changed during the > reassignments. Normally if there is no cluster preferred leader election, > the leader is still the old leader. But if during the reassignments, there > is a leader election, the leadership changes. This caused some side > effects. Let's look at this example. > {code} > Topic:georgeli_test PartitionCount:8ReplicationFactor:3 Configs: > Topic: georgeli_testPartition: 0Leader: 1026Replicas: > 1026,1028,1025Isr: 1026,1028,1025 > {code} > reassignment (1026,1028,1025) => (1027,1025,1028) > {code} > Topic:georgeli_test PartitionCount:8ReplicationFactor:4 > Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027 > Topic: georgeli_testPartition: 0Leader: 1026Replicas: > 1027,1025,1028,1026 Isr: 1026,1028,1025 > {code} > Notice the above: Leader remains 1026. but Replicas: 1027,1025,1028,1026. > If we run preferred leader election, it will try 1027 first, then 1025. > After 1027 is in ISR, then the final assignment will be (1027,1025,1028). > > My proposal for a minor improvement is to keep the original ordering replicas > during the reassignment (could be long for big topic/partitions). and after > all replicas in ISR, then finally set the partition assignment to New > reassignment. > {code} > val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ > controllerContext.partitionReplicaAssignment(topicPartition)).toSet > //1. Update AR in ZK with OAR + RAR. > updateAssignedReplicasForPartition(topicPartition, > newAndOldReplicas.toSeq) > {code} > above code changed to below to keep the original ordering first during > reassignment: > {code} > val newAndOldReplicas = > (controllerContext.partitionReplicaAssignment(topicPartition) ++ > reassignedPartitionContext.newReplicas).toSet > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8663) partition assignment would be better original_assignment + new_reassignment during reassignments
[ https://issues.apache.org/jira/browse/KAFKA-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] GEORGE LI updated KAFKA-8663: - Description: >From my observation/experience during reassignment, the partition assignment >replica ordering gets changed. because it's OAR + RAR (original replicas + >reassignment replicas) set union. However, it seems like the preferred leaders changed during the reassignments. Normally if there is no cluster preferred leader election, the leader is still the old leader. But if during the reassignments, there is a leader election, the leadership changes. This caused some side effects. Let's look at this example. {code} Topic:georgeli_test PartitionCount:8ReplicationFactor:3 Configs: Topic: georgeli_testPartition: 0Leader: 1026Replicas: 1026,1028,1025Isr: 1026,1028,1025 {code} reassignment (1026,1028,1025) => (1027,1025,1028) {code} Topic:georgeli_test PartitionCount:8ReplicationFactor:4 Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027 Topic: georgeli_testPartition: 0Leader: 1026Replicas: 1027,1025,1028,1026 Isr: 1026,1028,1025 {code} Notice the above: Leader remains 1026. but Replicas: 1027,1025,1028,1026. If we run preferred leader election, it will try 1027 first, then 1025. After 1027 is in ISR, then the final assignment will be (1027,1025,1028). My proposal for a minor improvement is to keep the original ordering replicas during the reassignment (could be long for big topic/partitions). and after all replicas in ISR, then finally set the partition assignment to New reassignment. {code} val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet //1. Update AR in ZK with OAR + RAR. updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq) {code} above code changed to below to keep the original ordering first during reassignment: {code} val newAndOldReplicas = (controllerContext.partitionReplicaAssignment(topicPartition) ++ reassignedPartitionContext.newReplicas).toSet {code} was: >From my observation/experience during reassignment, the partition assignment >replica ordering gets changed. because it's OAR + RAR (original replicas + >reassignment replicas) set union. However, it seems like the preferred leaders changed during the reassignments. Normally if there is no cluster preferred leader election, the leader is still the old leader. But if during the reassignments, there is a leader election, the leadership changes. This caused some side effects. Let's look at this example. {code} Topic:georgeli_test PartitionCount:8ReplicationFactor:3 Configs: Topic: georgeli_testPartition: 0Leader: 1026Replicas: 1026,1028,1025Isr: 1026,1028,1025 {code} reassignment (1026,1028,1025) => (1027,1025,1028) {code} Topic:georgeli_test PartitionCount:8ReplicationFactor:4 Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027 Topic: georgeli_testPartition: 0Leader: 1026Replicas: 1027,1025,1028,1026 Isr: 1026,1028,1025 {code} Notice the above: Leader remains 1026. but Replicas: 1027,1025,1028,1026. If we run preferred leader election, it will try 1027 first, then 1025. After 1027 is in ISR, then the final assignment will be (1027,1025,1028). My proposal for a minor improvement is to keep the original ordering replicas during the reassignment (could be long for big topic/partitions). and after all replicas in ISR, then finally set the partition assignment to New reassignment. {code} val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet //1. Update AR in ZK with OAR + RAR. updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq) {code} above code changed to below to keep the original ordering during reassignment: {code} val newAndOldReplicas = (controllerContext.partitionReplicaAssignment(topicPartition) ++ reassignedPartitionContext.newReplicas).toSet { code} > partition assignment would be better original_assignment + new_reassignment > during reassignments > > > Key: KAFKA-8663 > URL: https://issues.apache.org/jira/browse/KAFKA-8663 > Project: Kafka > Issue Type: Improvement > Components: controller, core >Affects Versions: 1.1.1, 2.3.0 >Reporter: GEORGE LI >Priority: Minor > > From my
[jira] [Created] (KAFKA-8663) partition assignment would be better original_assignment + new_reassignment during reassignments
GEORGE LI created KAFKA-8663: Summary: partition assignment would be better original_assignment + new_reassignment during reassignments Key: KAFKA-8663 URL: https://issues.apache.org/jira/browse/KAFKA-8663 Project: Kafka Issue Type: Improvement Components: controller, core Affects Versions: 2.3.0, 1.1.1 Reporter: GEORGE LI >From my observation/experience during reassignment, the partition assignment >replica ordering gets changed. because it's OAR + RAR (original replicas + >reassignment replicas) set union. However, it seems like the preferred leaders changed during the reassignments. Normally if there is no cluster preferred leader election, the leader is still the old leader. But if during the reassignments, there is a leader election, the leadership changes. This caused some side effects. Let's look at this example. {code} Topic:georgeli_test PartitionCount:8ReplicationFactor:3 Configs: Topic: georgeli_testPartition: 0Leader: 1026Replicas: 1026,1028,1025Isr: 1026,1028,1025 {code} reassignment (1026,1028,1025) => (1027,1025,1028) {code} Topic:georgeli_test PartitionCount:8ReplicationFactor:4 Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027 Topic: georgeli_testPartition: 0Leader: 1026Replicas: 1027,1025,1028,1026 Isr: 1026,1028,1025 {code} Notice the above: Leader remains 1026. but Replicas: 1027,1025,1028,1026. If we run preferred leader election, it will try 1027 first, then 1025. After 1027 is in ISR, then the final assignment will be (1027,1025,1028). My proposal for a minor improvement is to keep the original ordering replicas during the reassignment (could be long for big topic/partitions). and after all replicas in ISR, then finally set the partition assignment to New reassignment. {code} val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet //1. Update AR in ZK with OAR + RAR. updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq) {code} above code changed to below to keep the original ordering during reassignment: {code} val newAndOldReplicas = (controllerContext.partitionReplicaAssignment(topicPartition) ++ reassignedPartitionContext.newReplicas).toSet { code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)
[ https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881480#comment-16881480 ] GEORGE LI commented on KAFKA-8638: -- Hi Jose, Because a broker can have hundreds/thousands topic/partitions assigned to it. To do reassignments to move it to the end and lower the priority, then remember to the original ordering to restore later is much more tedious than simply put it in "deprioritized list" for some time, then remove it when certain conditions are improved/met. We have a Rebalance Tool which rebalance the whole cluster, it's better not keep changing the assignments replicas ordering constantly. With the "deprioritized list" , it's cleaner. Let's just take the use case of taking controller out of being leaders/serving traffic, and just as followers. We observed that broker not serving any leaders will have less CPU utilization. For clusters with busy controller doing extra work than other brokers, we would like it to not taking any leaders. Right now, for a broker to lose leadership, we need to bounce the broker. In this case, if bounce, the controller fails over to another broker. If we change the ordering of the current assignments for the controller, next time, the controller fails over, we need to do the same. For managing "deprioritized list", the user (e.g. the on-call engineer seeing issue with a broker that should not serve leadership traffic) should have the ability to add/remove it. My initial thought on how to store this "deprioritized list" is 2 approaches below: * Design #1: Introduce a Preferred Leader Blacklist. e.g. ZK path/node: /preferred_leader_blacklist/ Direct manipulation of ZK should be avoided as Kafka is moving toward RPC based. A new Request/Response RPC call is needed. No ZK Watcher of this ZK node children is needed to trigger leadership changes for the current design. * Design #2: Introduce a preferred_leader_blacklist dynamic config which by default is empty. It allows a list of broker IDs separated by commas. E.g. below broker ID 1, 10, 65 are being put into the blacklist. {code} /usr/lib/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config preferred_leader_blacklist=1,10,65 {code} Since the Kafka dynamic config is already using --bootstrap-server, it does not need to manipulate the Zookeeper directly. The downside of this: when adding/removing one broker from the list, instead of doing with one ZK node per broker in Design#1 above, the dynamic config needs to be updated with a new complete list. E.g. in order to remove broker 10 from the blacklist, update preferred_leader_blacklist=1,65 The dynamic config should not trigger any leadership changes for the current design. > Preferred Leader Blacklist (deprioritized list) > --- > > Key: KAFKA-8638 > URL: https://issues.apache.org/jira/browse/KAFKA-8638 > Project: Kafka > Issue Type: Improvement > Components: config, controller, core >Affects Versions: 1.1.1, 2.3.0, 2.2.1 >Reporter: GEORGE LI >Assignee: GEORGE LI >Priority: Major > > Currently, the kafka preferred leader election will pick the broker_id in the > topic/partition replica assignments in a priority order when the broker is in > ISR. The preferred leader is the broker id in the first position of replica. > There are use-cases that, even the first broker in the replica assignment is > in ISR, there is a need for it to be moved to the end of ordering (lowest > priority) when deciding leadership during preferred leader election. > Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred > leader. When preferred leadership is run, it will pick 1 as the leader if > it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, > then pick 3 as the leader. There are use cases that, even 1 is in ISR, we > would like it to be moved to the end of ordering (lowest priority) when > deciding leadership during preferred leader election. Below is a list of > use cases: > * (If broker_id 1 is a swapped failed host and brought up with last segments > or latest offset without historical data (There is another effort on this), > it's better for it to not serve leadership till it's caught-up. > * The cross-data center cluster has AWS instances which have less computing > power than the on-prem bare metal machines. We could put the AWS broker_ids > in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, > without changing the reassignments ordering of the replicas. > * If the broker_id 1 is constantly losing leadership after some time: > "Flapping". we would want to exclude 1 to be a leader unless all other
[jira] [Updated] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)
[ https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] GEORGE LI updated KAFKA-8638: - Description: Currently, the kafka preferred leader election will pick the broker_id in the topic/partition replica assignments in a priority order when the broker is in ISR. The preferred leader is the broker id in the first position of replica. There are use-cases that, even the first broker in the replica assignment is in ISR, there is a need for it to be moved to the end of ordering (lowest priority) when deciding leadership during preferred leader election. Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred leader. When preferred leadership is run, it will pick 1 as the leader if it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 3 as the leader. There are use cases that, even 1 is in ISR, we would like it to be moved to the end of ordering (lowest priority) when deciding leadership during preferred leader election. Below is a list of use cases: * (If broker_id 1 is a swapped failed host and brought up with last segments or latest offset without historical data (There is another effort on this), it's better for it to not serve leadership till it's caught-up. * The cross-data center cluster has AWS instances which have less computing power than the on-prem bare metal machines. We could put the AWS broker_ids in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, without changing the reassignments ordering of the replicas. * If the broker_id 1 is constantly losing leadership after some time: "Flapping". we would want to exclude 1 to be a leader unless all other brokers of this topic/partition are offline. The “Flapping” effect was seen in the past when 2 or more brokers were bad, when they lost leadership constantly/quickly, the sets of partition replicas they belong to will see leadership constantly changing. The ultimate solution is to swap these bad hosts. But for quick mitigation, we can also put the bad hosts in the Preferred Leader Blacklist to move the priority of its being elected as leaders to the lowest. * If the controller is busy serving an extra load of metadata requests and other tasks. we would like to put the controller's leaders to other brokers to lower its CPU load. currently bouncing to lose leadership would not work for Controller, because after the bounce, the controller fails over to another broker. * Avoid bouncing broker in order to lose its leadership: it would be good if we have a way to specify which broker should be excluded from serving traffic/leadership (without changing the replica assignment ordering by reassignments, even though that's quick), and run preferred leader election. A bouncing broker will cause temporary URP, and sometimes other issues. Also a bouncing of broker (e.g. broker_id 1) can temporarily lose all its leadership, but if another broker (e.g. broker_id 2) fails or gets bounced, some of its leaderships will likely failover to broker_id 1 on a replica with 3 brokers. If broker_id 1 is in the blacklist, then in such a scenario even broker_id 2 offline, the 3rd broker can take leadership. The current work-around of the above is to change the topic/partition's replica reassignments to move the broker_id 1 from the first position to the last position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). This changes the replica reassignments, and we need to keep track of the original one and restore if things change (e.g. controller fails over to another broker, the swapped empty broker caught up). That’s a rather tedious task. was: Currently, the kafka preferred leader election will pick the broker_id in the topic/partition replica assignments in a priority order when the broker is in ISR. The preferred leader is the broker id in the first position of replica. There are use-cases that, even the first broker in the replica assignment is in ISR, there is a need for it to be moved to the end of ordering (lowest priority) when deciding leadership during preferred leader election. Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred leader. When preferred leadership is run, it will pick 1 as the leader if it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 3 as the leader. There are use cases that, even 1 is in ISR, we would like it to be moved to the end of ordering (lowest priority) when deciding leadership during preferred leader election. Below is a list of use cases: # If broker_id 1 is a swapped failed host and brought up with last segments or latest offset without historical data (There is another effort on this), it's better for it to not serve leadership till it's caught-up. # The cross-data center cluster has AWS instances which
[jira] [Created] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)
GEORGE LI created KAFKA-8638: Summary: Preferred Leader Blacklist (deprioritized list) Key: KAFKA-8638 URL: https://issues.apache.org/jira/browse/KAFKA-8638 Project: Kafka Issue Type: Improvement Components: config, controller, core Affects Versions: 2.2.1, 2.3.0, 1.1.1 Reporter: GEORGE LI Assignee: GEORGE LI Currently, the kafka preferred leader election will pick the broker_id in the topic/partition replica assignments in a priority order when the broker is in ISR. The preferred leader is the broker id in the first position of replica. There are use-cases that, even the first broker in the replica assignment is in ISR, there is a need for it to be moved to the end of ordering (lowest priority) when deciding leadership during preferred leader election. Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred leader. When preferred leadership is run, it will pick 1 as the leader if it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 3 as the leader. There are use cases that, even 1 is in ISR, we would like it to be moved to the end of ordering (lowest priority) when deciding leadership during preferred leader election. Below is a list of use cases: # If broker_id 1 is a swapped failed host and brought up with last segments or latest offset without historical data (There is another effort on this), it's better for it to not serve leadership till it's caught-up. # The cross-data center cluster has AWS instances which have less computing power than the on-prem bare metal machines. We could put the AWS broker_ids in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, without changing the reassignments ordering of the replicas. # If the broker_id 1 is constantly losing leadership after some time: "Flapping". we would want to exclude 1 to be a leader unless all other brokers of this topic/partition are offline. The “Flapping” effect was seen in the past when 2 or more brokers were bad, when they lost leadership constantly/quickly, the sets of partition replicas they belong to will see leadership constantly changing. The ultimate solution is to swap these bad hosts. But for quick mitigation, we can also put the bad hosts in the Preferred Leader Blacklist to move the priority of its being elected as leaders to the lowest. # If the controller is busy serving an extra load of metadata requests and other tasks. we would like to put the controller's leaders to other brokers to lower its CPU load. currently bouncing to lose leadership would not work for Controller, because after the bounce, the controller fails over to another broker. # Avoid bouncing broker in order to lose its leadership: it would be good if we have a way to specify which broker should be excluded from serving traffic/leadership (without changing the replica assignment ordering by reassignments, even though that's quick), and run preferred leader election. A bouncing broker will cause temporary URP, and sometimes other issues. Also a bouncing of broker (e.g. broker_id 1) can temporarily lose all its leadership, but if another broker (e.g. broker_id 2) fails or gets bounced, some of its leaderships will likely failover to broker_id 1 on a replica with 3 brokers. If broker_id 1 is in the blacklist, then in such a scenario even broker_id 2 offline, the 3rd broker can take leadership. The current work-around of the above is to change the topic/partition's replica reassignments to move the broker_id 1 from the first position to the last position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). This changes the replica reassignments, and we need to keep track of the original one and restore if things change (e.g. controller fails over to another broker, the swapped empty broker caught up). That’s a rather tedious task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8527) add dynamic maintenance broker config
[ https://issues.apache.org/jira/browse/KAFKA-8527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868914#comment-16868914 ] GEORGE LI commented on KAFKA-8527: -- We had this implemented in a separate admin service outside of kafka. One of its functions is for topic related changes. e.g. Topic Creation, Topic partition expansion. There is a ZK path/node which indicate which broker_id is under "maintenance" and thus when creating topic/expanding topic partition, it will exclude this live broker_id from getting any new replica assignments. > add dynamic maintenance broker config > - > > Key: KAFKA-8527 > URL: https://issues.apache.org/jira/browse/KAFKA-8527 > Project: Kafka > Issue Type: Improvement >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > Before we remove a broker for maintenance, we want to remove all partitions > out of the broker first to avoid introducing new Under Replicated Partitions > (URPs) . That is because shutting down (or killing) a broker that still hosts > live partitions will lead to temporarily reduced replicas of those > partitions. Moving partitions out of a broker can be done via partition > reassignment. However, during the partition reassignment process, new topics > can be created by Kafka and thereby new partitions can be added to the broker > that is pending for removal. As a result, the removal process will need to > recursively moving new topic partitions out of the maintenance broker. In a > production environment in which topic creation is frequent and URP causing by > broker removal cannot be tolerated, the removal process can take multiple > iterations to complete the partition reassignment. We want to provide a > mechanism to mask a broker as maintenance broker (Via Cluster Level Dynamic > configuration). One action Kafka can take for the maintenance broker is not > to assign new topic partitions to it, and thereby facilitate the broker > removal. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777257#comment-16777257 ] GEORGE LI commented on KAFKA-6794: -- Hi [~viktorsomogyi], I think this "Incremental Reassignment" is different from KIP-236 "Planned Future Change" section. That one is basically trying to overcome the current limitation that only one batch of reassignments can be run in /admin/reassign_partitions. e.g. 50 reassignments in a batch submitted, 49 completed. and there is one long running reassignment pending in /admin/reassign_partitions, Currently, not able to submit new batch until all in /admin/reassign_partitions are completed and the node is removed from ZK. If the cluster is pretty much idle, this pretty much waste the resource for not able to submit new reassignments. The proposal is to enable submit new batch to a queue (ZK node), and merge the new assignments to /admin/reassign_partitions. This will try to use the Cancel Reassignments if there is conflict (same topic/partition) in both the new queue and the current /admin/reassign_partitions . > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777149#comment-16777149 ] GEORGE LI edited comment on KAFKA-6794 at 2/25/19 7:56 PM: --- Hi, [~viktorsomogyi], When I rebalance the whole cluster, I generate the reassignment plan json with a list of topic/partitions with its new_replicas/original_replicas, and sort them by their size, so try to group them in batches of similar sizes for execution, so that they are expected to complete reassignment using about the same amount of time. Say there are 1000 reassignments, and 50 per batch. That will be at least 20 batches/buckets to put in for execution. there could be > 20 batches because a reassignment like (1,2,3,4) => (5,6,7,8) can be split into 4 reassignments in 4 batches. A batch will be submitted, and an execution program will keep checking the existence of /admin/reassign_partitions before submitting the next batch. Comparing the new_replicas Vs. original_replicas set, the algorithm can detect if there is more than 1 new replica in the new_replicas, if yes, then break it and put in different batch/bucket.There are other considerations of the reassignments in the same batch: e.g. for different topic/partition, try to spread the load and not to overwhelm a Leader. e.g. the Leadership bytes within the same batch for reassignments should be balanced across all brokers/leaders in the cluster as much as possible. Same for new follower (spread across the cluster not to overwhelm a particular follower). I think this (optimal executions of reassignment plans in batches) can be achieved outside of Kafka. was (Author: sql_consulting): Hi, [~viktorsomogyi], When I rebalance the whole cluster, I generate the reassignment plan json with a list of topic/partitions with its new_replicas/original_replicas, and sort them by their size, so try to group them in batches of similar sizes for execution, so that they are expected to complete reassignment using about the same amount of time. Say there are 1000 reassignments, and 50 per batch. That will be at least 20 batches/buckets to put in for execution. Comparing the new_replicas Vs. original_replicas set, the algorithm can detect if there is more than 1 new replica in the new_replicas, if yes, then break it and put in different batch/bucket.There are other considerations of the reassignments in the same batch: e.g. for different topic/partition, try to spread the load and not to overwhelm a Leader. e.g. the Leadership bytes within the same batch for reassignments should be balanced across all brokers/leaders in the cluster as much as possible. I think this (optimal executions of reassignment plans in batches) can only be achieved outside of Kafka. > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777149#comment-16777149 ] GEORGE LI commented on KAFKA-6794: -- Hi, [~viktorsomogyi], When I rebalance the whole cluster, I generate the reassignment plan json with a list of topic/partitions with its new_replicas/original_replicas, and sort them by their size, so try to group them in batches of similar sizes for execution, so that they are expected to complete reassignment using about the same amount of time. Say there are 1000 reassignments, and 50 per batch. That will be at least 20 batches/buckets to put in for execution. Comparing the new_replicas Vs. original_replicas set, the algorithm can detect if there is more than 1 new replica in the new_replicas, if yes, then break it and put in different batch/bucket.There are other considerations of the reassignments in the same batch: e.g. for different topic/partition, try to spread the load and not to overwhelm a Leader. e.g. the Leadership bytes within the same batch for reassignments should be balanced across all brokers/leaders in the cluster as much as possible. I think this (optimal executions of reassignment plans in batches) can only be achieved outside of Kafka. > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775598#comment-16775598 ] GEORGE LI edited comment on KAFKA-6794 at 2/22/19 9:21 PM: --- I also have seen this issue. When more than one broker is in the New Replicas of the reassignments, the topic is big, even with throttle, the leader is working hard to sync to all the extra followers and could cause latency jump. {{One of the solutions is to execute the reassignment plans in an "Optimal" way. Submit the reassignment plans in batches. making sure each batch, the topic/partition will have only one extra New broker in the New Replicas, wait till that reassignment completes, then resubmit another one. e.g. if the reassignment is (1,2,3,4) => (5,6,7,8), Split it in 4 batches (buckets), every batch only 1 new replica. }} {{Batch 1: (1,2,3,5)}} {{Batch 2: (1,2,5,6)}} {{Batch 3: (1,5,6,7)}} {{Batch 4: (5,6,7,8)}} Between each batch, check ZK node /admin/reassign_partitions exists, if yes, sleep and check again, if not, submit next batch. was (Author: sql_consulting): I also have seen this issue. When more than one broker is in the New Replicas of the reassignments, the topic is big, even with throttle, the leader is working hard to sync to all the extra followers and could cause latency jump. {{One of solutions is execute the reassignment plans in an "Optimal" way. Submit the reassignment plans in batches. making sure each batch, the topic/partition will have only one extra New broker in the New Replicas, wait till that reassignment completes, then resubmit another one. e.g. for if the reassignment is (1,2,3,4) => (5,6,7,8). Split it in 4 batches (buckets), every batch only 1 new replica. }} {{Batch 1: (1,2,3,5)}} {{Batch 2: (1,2,5,6)}} {{Batch 3: (1,5,6,7)}} {{Batch 4: (5,6,7,8)}} Between each batch, check ZK node /admin/reassign_partitions exists, if yes, sleep and check again, if not, submit next batch. > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775598#comment-16775598 ] GEORGE LI commented on KAFKA-6794: -- I also have seen this issue. When more than one broker is in the New Replicas of the reassignments, the topic is big, even with throttle, the leader is working hard to sync to all the extra followers and could cause latency jump. {{One of solutions is execute the reassignment plans in an "Optimal" way. Submit the reassignment plans in batches. making sure each batch, the topic/partition will have only one extra New broker in the New Replicas, wait till that reassignment completes, then resubmit another one. e.g. for if the reassignment is (1,2,3,4) => (5,6,7,8). Split it in 4 batches (buckets), every batch only 1 new replica. }} {{Batch 1: (1,2,3,5)}} {{Batch 2: (1,2,5,6)}} {{Batch 3: (1,5,6,7)}} {{Batch 4: (5,6,7,8)}} Between each batch, check ZK node /admin/reassign_partitions exists, if yes, sleep and check again, if not, submit next batch. > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7854) Behavior change in controller picking up partition reassignment tasks since 1.1.0
[ https://issues.apache.org/jira/browse/KAFKA-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16773395#comment-16773395 ] GEORGE LI commented on KAFKA-7854: -- [KIP-236|https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment] is addressing the cancel/rollback of pending reassignments. WIP: KAFKA-6359 Github Pull Request#: [6296|https://github.com/apache/kafka/pull/6296] > Behavior change in controller picking up partition reassignment tasks since > 1.1.0 > - > > Key: KAFKA-7854 > URL: https://issues.apache.org/jira/browse/KAFKA-7854 > Project: Kafka > Issue Type: Improvement > Components: controller >Reporter: Zhanxiang (Patrick) Huang >Priority: Major > > After [https://github.com/apache/kafka/pull/4143,] the controller does not > subscribe to data change on /admin/reassign_partitions any more (in order to > avoid unnecessarily loading the reassignment data again after controller > updating the znode) as opposed to the previous kafka versions. However, there > are systems built around kafka relying on the previous behavior to > incrementally update the list of partition reassignment since kafka does not > natively support that. > > For example, [cruise control|https://github.com/linkedin/cruise-control] can > rely on the previous behavior (controller listening to data changes) to > maintain the reassignment concurrency by dynamically updating the data in the > reassignment znode instead of waiting for the current batch to finish and > doing reassignment batch by batch, which can significantly reduce the > rebalance time in production clusters. Although directly updating the znode > can somehow be viewed as an anti-pattern in the long term, this is necessary > since kafka does not natively support incrementally submit more reassignment > tasks. However, after our kafka clusters migrate from 0.11 to 2.0, cruise > control no longer works because the controller behavior has changed. This > reveals the following problems: > * These behavior changes may be viewed as internal changes so compatibility > is not guaranteed but I think by convention people do view this as public > interfaces and rely on the compatibility. In this case, I think we should > clearly document the data contract for the partition reassignment task to > avoid misusage and making controller changes that break the defined data > contract. There may be other cases (e.g. topic deletion) whose data contracts > need to be clearly defined and we should keep it in mind when making > controller changes. > * Kafka does not natively support incrementally submit more reassignment > tasks. If we do want to support that nicely, we should consider change how we > store the reassignment data to store the data in child nodes and let the > controller listen on child node changes, similar to what we do for > /admin/delete_topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6304) The controller should allow updating the partition reassignment for the partitions being reassigned
[ https://issues.apache.org/jira/browse/KAFKA-6304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16773394#comment-16773394 ] GEORGE LI commented on KAFKA-6304: -- [KIP-236|https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment] is addressing the cancel/rollback of pending reassignments. WIP: KAFKA-6359 Github Pull Request#: [6296|https://github.com/apache/kafka/pull/6296] > The controller should allow updating the partition reassignment for the > partitions being reassigned > --- > > Key: KAFKA-6304 > URL: https://issues.apache.org/jira/browse/KAFKA-6304 > Project: Kafka > Issue Type: Improvement > Components: controller, core >Affects Versions: 1.0.0 >Reporter: Jiangjie Qin >Priority: Major > > Currently the controller will not process the partition reassignment of a > partition if the partition is already being reassigned. > The issue is that if there is a broker failure during the partition > reassignment, the partition reassignment may never finish. And the users may > want to cancel the partition reassignment. However, the controller will > refuse to do that unless user manually deletes the partition reassignment zk > path, force a controller switch and then issue the revert command. This is > pretty involved. It seems reasonable for the controller to replace the > ongoing stuck reassignment and replace it with the updated partition > assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6359) Work for KIP-236
[ https://issues.apache.org/jira/browse/KAFKA-6359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16773351#comment-16773351 ] GEORGE LI commented on KAFKA-6359: -- New Pull Request: https://github.com/apache/kafka/pull/6296 > Work for KIP-236 > > > Key: KAFKA-6359 > URL: https://issues.apache.org/jira/browse/KAFKA-6359 > Project: Kafka > Issue Type: Improvement >Reporter: Tom Bentley >Assignee: GEORGE LI >Priority: Minor > > This issue is for the work described in KIP-236. -- This message was sent by Atlassian JIRA (v7.6.3#76005)