[jira] [Commented] (KAFKA-13331) Slow reassignments in 2.8 because of large number of UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),) Events

2021-10-03 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17423761#comment-17423761
 ] 

GEORGE LI commented on KAFKA-13331:
---

besides slowness of the reassignments,  when the EventQueue is long, with a lot 
of UpdateMetadataRequestResponse events.  The cluster  is also not behaving 
correctly (slow / timeout with some RPC calls). e.g.   using {{kafka-topics.sh  
--bootstrap-server}}  will time out like below.  because it puts 
{{listPartitionReassignments}}  in the EventQueue.   The old way 
{{kafka-topics.sh  --zookeeper}} returned fast. 

{code}
$ /usr/lib/kafka/bin/kafka-topics.sh --bootstrap-server :9092 
--topic  --describe
Error while executing topic command : Call(callName=listPartitionReassignments, 
deadlineMs=1632339660352, tries=1, nextAllowedTryMs=1632339660520) timed out at 
1632339660420 after 1 attempt(s)
[2021-09-22 19:41:00,425] ERROR 
org.apache.kafka.common.errors.TimeoutException: 
Call(callName=listPartitionReassignments, deadlineMs=1632339660352, tries=1, 
nextAllowedTryMs=1632339660520) timed out at 1632339660420 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled 
listPartitionReassignments request with correlation id 9 due to node 10043 
being disconnected
 (kafka.admin.TopicCommand$)
{code}

Any RPC calls putting into the EventQueue will be affected during the 
reassignments (batch size 50+) in our environment. 



> Slow reassignments in 2.8 because of large number of  
> UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),)
>  Events
> 
>
> Key: KAFKA-13331
> URL: https://issues.apache.org/jira/browse/KAFKA-13331
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, controller, core
>Affects Versions: 2.8.1, 3.0.0
>Reporter: GEORGE LI
>Priority: Critical
> Fix For: 3.1.0
>
> Attachments: Screen Shot 2021-09-28 at 12.57.34 PM.png
>
>
> Slowness is observed when doing reassignments on clusters with more brokers 
> (e.g. 80 brokers). 
> After investigation, it looks like the slowness is because  for 
> reassignments, it sends the UpdateMetadataRequest to all the broker for every 
> topic partition affected by the reassignment (maybe some optimization can be 
> done).   e.g. 
> for a reassignment with batch size of 50 partitions.  it will generate about 
> 10k - 20k ControllerEventManager.EventQueueSize and  the p99 EventQueueTimeMs 
> will be  1M.   if the batch size is 100 partitions,   about 40K 
> EventQeuueSize and  3M p99 EventQueueTimeMs.   See below screen shot on the 
> metrics.  
>  !Screen Shot 2021-09-28 at 12.57.34 PM.png! 
> it takes about 10-30minutes to process 100 reassignments per batch.   and 
> 20-30 seconds for 1 reassignment per batch even the topic partition is almost 
> empty.   Version 1.1, the reassignment is almost instant. 
> Looking at what is in the ControllerEventManager.EventQueue,  the majority 
> (depends on the how many brokers in the cluster, it can be 90%+)  is  
> {{UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),)}}
>  events.   which is introduced  in this commit: 
> {code}
> commit 4e431246c31170a7f632da8edfdb9cf4f882f6ef
> Author: Jason Gustafson 
> Date:   Thu Nov 21 07:41:29 2019 -0800
> MINOR: Controller should log UpdateMetadata response errors (#7717)
> 
> Create a controller event for handling UpdateMetadata responses and log a 
> message when a response contains an error.
> 
> Reviewers: Stanislav Kozlovski , Ismael 
> Juma 
> {code}
> Checking how the events in the ControllerEventManager are processed for  
> {{UpdateMetadata response}},  it seems like it's only checking whether there 
> is an error, and simply log the error.  but it takes about 3ms - 60ms  to 
> dequeue each event.  Because it's a FIFO queue,  other events were waiting in 
> the queue. 
> {code}
>   private def processUpdateMetadataResponseReceived(updateMetadataResponse: 
> UpdateMetadataResponse, brokerId: Int): Unit = {
> if (!isActive) return
> if (updateMetadataResponse.error != Errors.NONE) {
>   stateChangeLogger.error(s"Received error 
> ${updateMetadataResponse.error} in UpdateMetadata " +
> s"response $updateMetadataResponse from broker $brokerId")
> }
>   }
> {code}
> There might be more sophisticated logic for handling the UpdateMetadata 
> response error in the future.   For current version,   would it be better to 
> check whether  the response error code is  Errors.NONE before putting into 
> the Event Queue?   e.g.  I put this additional check and see the Reassignment 
> Performance increase dramatically on the 80 brokers cluster. 
> {code}
>  val 

[jira] [Created] (KAFKA-13331) Slow reassignments in 2.8 because of large number of UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),) Events

2021-09-28 Thread GEORGE LI (Jira)
GEORGE LI created KAFKA-13331:
-

 Summary: Slow reassignments in 2.8 because of large number of  
UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),)
 Events
 Key: KAFKA-13331
 URL: https://issues.apache.org/jira/browse/KAFKA-13331
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 3.0.0, 2.8.1
Reporter: GEORGE LI
 Attachments: Screen Shot 2021-09-28 at 12.57.34 PM.png

Slowness is observed when doing reassignments on clusters with more brokers 
(e.g. 80 brokers). 

After investigation, it looks like the slowness is because  for reassignments, 
it sends the UpdateMetadataRequest to all the broker for every topic partition 
affected by the reassignment (maybe some optimization can be done).   e.g. 

for a reassignment with batch size of 50 partitions.  it will generate about 
10k - 20k ControllerEventManager.EventQueueSize and  the p99 EventQueueTimeMs 
will be  1M.   if the batch size is 100 partitions,   about 40K EventQeuueSize 
and  3M p99 EventQueueTimeMs.   See below screen shot on the metrics.  

 !Screen Shot 2021-09-28 at 12.57.34 PM.png! 

it takes about 10-30minutes to process 100 reassignments per batch.   and 20-30 
seconds for 1 reassignment per batch even the topic partition is almost empty.  
 Version 1.1, the reassignment is almost instant. 

Looking at what is in the ControllerEventManager.EventQueue,  the majority 
(depends on the how many brokers in the cluster, it can be 90%+)  is  
{{UpdateMetadataResponseReceived(UpdateMetadataResponseData(errorCode=0),)}}
 events.   which is introduced  in this commit: 

{code}
commit 4e431246c31170a7f632da8edfdb9cf4f882f6ef
Author: Jason Gustafson 
Date:   Thu Nov 21 07:41:29 2019 -0800

MINOR: Controller should log UpdateMetadata response errors (#7717)

Create a controller event for handling UpdateMetadata responses and log a 
message when a response contains an error.

Reviewers: Stanislav Kozlovski , Ismael 
Juma 
{code}

Checking how the events in the ControllerEventManager are processed for  
{{UpdateMetadata response}},  it seems like it's only checking whether there is 
an error, and simply log the error.  but it takes about 3ms - 60ms  to dequeue 
each event.  Because it's a FIFO queue,  other events were waiting in the 
queue. 

{code}
  private def processUpdateMetadataResponseReceived(updateMetadataResponse: 
UpdateMetadataResponse, brokerId: Int): Unit = {
if (!isActive) return

if (updateMetadataResponse.error != Errors.NONE) {
  stateChangeLogger.error(s"Received error ${updateMetadataResponse.error} 
in UpdateMetadata " +
s"response $updateMetadataResponse from broker $brokerId")
}
  }
{code}

There might be more sophisticated logic for handling the UpdateMetadata 
response error in the future.   For current version,   would it be better to 
check whether  the response error code is  Errors.NONE before putting into the 
Event Queue?   e.g.  I put this additional check and see the Reassignment 
Performance increase dramatically on the 80 brokers cluster. 

{code}
 val updateMetadataRequestBuilder = new 
UpdateMetadataRequest.Builder(updateMetadataRequestVersion,
controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava, 
liveBrokers.asJava, topicIds.asJava)
  sendRequest(broker, updateMetadataRequestBuilder, (r: AbstractResponse) 
=> {
val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse]
if (updateMetadataResponse.error != Errors.NONE) {   //< 
Add additional check whether the response code,   if no error, which is almost 
99.99% the case, skip adding updateMetadataResponse to the Event Queue. 
  sendEvent(UpdateMetadataResponseReceived(updateMetadataResponse, 
broker))
}
  })
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id

2021-06-21 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367008#comment-17367008
 ] 

GEORGE LI commented on KAFKA-12971:
---

[~ijuma]  The Flink 1.9 can use 2.x kafka-clients, while Flink 1.4 still  not 
yet.   

> Kakfa 1.1.x clients cache broker hostnames,  client stuck when host is 
> swapped for the same broker.id
> -
>
> Key: KAFKA-12971
> URL: https://issues.apache.org/jira/browse/KAFKA-12971
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: GEORGE LI
>Priority: Major
> Fix For: 2.1.2
>
>
> There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug 
> in 0.11 with too frequent consumer offset commits.  Due to the Flink version, 
> it can be directly using latest 2.x kafka-client version. 
> {code}
> Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: 
> org.apache.kafka.common.errors.DisconnectException.
> {code}
> some consumers were stuck with above messages with broker.id 425 had hardware 
> failures and got swapped with a different hostname. 
> Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 
> 0.11.0.3: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now, this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> 1.1.x: 
> {code}
>  public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> if (nodeState.containsKey(id)) {
> NodeConnectionState connectionState = nodeState.get(id);
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> } else {
> nodeState.put(id, new 
> NodeConnectionState(ConnectionState.CONNECTING, now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> }
> {code}
> 2.2.x: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> NodeConnectionState connectionState = nodeState.get(id);
> if (connectionState != null && connectionState.host().equals(host)) {
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> return;
> } else if (connectionState != null) {
> log.info("Hostname for node {} changed from {} to {}.", id, 
> connectionState.host(), host);
> }
> // Create a new NodeConnectionState if nodeState does not already 
> contain one
> // for the specified id or if the hostname associated with the node 
> id changed.
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> From above, the {{0.11.0.3}} is just putting the node to the NodeState 
> HashMap to retry with update host.
> In {{1.1.x}}, it adds a logic of "caching". {{if 
> (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is 
> swapped/changed, it never gets to the else block to update the NodeState with 
> the new hostname.
> In {{2.2.x}}, it adds an additional check {{if (connectionState != null && 
> connectionState.host().equals(host))}}, if the Hostname changed, then called 
> {{nodeState.put()}} to update the host.
> So from above, it looks like the 1.1.x caching logic introduced a bug of not 
> updating the nodeState()'s host when that is changed (e..g host failure, swap 
> with a different hostname, but use the same broker.id).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id

2021-06-19 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17366011#comment-17366011
 ] 

GEORGE LI commented on KAFKA-12971:
---

This issue is fixed in the {{1.1.x}} kafka client by back porting the fix in 
KAFKA-7890  from 2.x to invalidate the cache when the hostname is changed. 

> Kakfa 1.1.x clients cache broker hostnames,  client stuck when host is 
> swapped for the same broker.id
> -
>
> Key: KAFKA-12971
> URL: https://issues.apache.org/jira/browse/KAFKA-12971
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: GEORGE LI
>Priority: Major
> Fix For: 2.1.2
>
>
> There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug 
> in 0.11 with too frequent consumer offset commits.  Due to the Flink version, 
> it can be directly using latest 2.x kafka-client version. 
> {code}
> Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: 
> org.apache.kafka.common.errors.DisconnectException.
> {code}
> some consumers were stuck with above messages with broker.id 425 had hardware 
> failures and got swapped with a different hostname. 
> Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 
> 0.11.0.3: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now, this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> 1.1.x: 
> {code}
>  public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> if (nodeState.containsKey(id)) {
> NodeConnectionState connectionState = nodeState.get(id);
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> } else {
> nodeState.put(id, new 
> NodeConnectionState(ConnectionState.CONNECTING, now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> }
> {code}
> 2.2.x: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> NodeConnectionState connectionState = nodeState.get(id);
> if (connectionState != null && connectionState.host().equals(host)) {
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> return;
> } else if (connectionState != null) {
> log.info("Hostname for node {} changed from {} to {}.", id, 
> connectionState.host(), host);
> }
> // Create a new NodeConnectionState if nodeState does not already 
> contain one
> // for the specified id or if the hostname associated with the node 
> id changed.
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> From above, the {{0.11.0.3}} is just putting the node to the NodeState 
> HashMap to retry with update host.
> In {{1.1.x}}, it adds a logic of "caching". {{if 
> (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is 
> swapped/changed, it never gets to the else block to update the NodeState with 
> the new hostname.
> In {{2.2.x}}, it adds an additional check {{if (connectionState != null && 
> connectionState.host().equals(host))}}, if the Hostname changed, then called 
> {{nodeState.put()}} to update the host.
> So from above, it looks like the 1.1.x caching logic introduced a bug of not 
> updating the nodeState()'s host when that is changed (e..g host failure, swap 
> with a different hostname, but use the same broker.id).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id

2021-06-19 Thread GEORGE LI (Jira)
GEORGE LI created KAFKA-12971:
-

 Summary: Kakfa 1.1.x clients cache broker hostnames,  client stuck 
when host is swapped for the same broker.id
 Key: KAFKA-12971
 URL: https://issues.apache.org/jira/browse/KAFKA-12971
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.1.1, 1.1.0, 1.1.2
Reporter: GEORGE LI
 Fix For: 2.1.2


There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug in 
0.11 with too frequent consumer offset commits.  Due to the Flink version, it 
can be directly using latest 2.x kafka-client version. 

{code}
Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: 
org.apache.kafka.common.errors.DisconnectException.
{code}

some consumers were stuck with above messages with broker.id 425 had hardware 
failures and got swapped with a different hostname. 

Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 

0.11.0.3: 

{code}
public void connecting(String id, long now, String host, ClientDnsLookup 
clientDnsLookup) {
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
now, this.reconnectBackoffInitMs, host, clientDnsLookup));
}
{code}


1.1.x: 

{code}
 public void connecting(String id, long now, String host, ClientDnsLookup 
clientDnsLookup) {
if (nodeState.containsKey(id)) {
NodeConnectionState connectionState = nodeState.get(id);
connectionState.lastConnectAttemptMs = now;
connectionState.state = ConnectionState.CONNECTING;
// Move to next resolved address, or if addresses are exhausted, 
mark node to be re-resolved
connectionState.moveToNextAddress();
} else {
nodeState.put(id, new 
NodeConnectionState(ConnectionState.CONNECTING, now,
this.reconnectBackoffInitMs, host, clientDnsLookup));
}
}
{code}


2.2.x: 

{code}
public void connecting(String id, long now, String host, ClientDnsLookup 
clientDnsLookup) {
NodeConnectionState connectionState = nodeState.get(id);
if (connectionState != null && connectionState.host().equals(host)) {
connectionState.lastConnectAttemptMs = now;
connectionState.state = ConnectionState.CONNECTING;
// Move to next resolved address, or if addresses are exhausted, 
mark node to be re-resolved
connectionState.moveToNextAddress();
return;
} else if (connectionState != null) {
log.info("Hostname for node {} changed from {} to {}.", id, 
connectionState.host(), host);
}

// Create a new NodeConnectionState if nodeState does not already 
contain one
// for the specified id or if the hostname associated with the node id 
changed.
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
now,
this.reconnectBackoffInitMs, host, clientDnsLookup));
}
{code}


>From above, the {{0.11.0.3}} is just putting the node to the NodeState HashMap 
>to retry with update host.

In {{1.1.x}}, it adds a logic of "caching". {{if (nodeState.containsKey(id))}}, 
However, if the HOSTNAME of the broker.id is swapped/changed, it never gets to 
the else block to update the NodeState with the new hostname.

In {{2.2.x}}, it adds an additional check {{if (connectionState != null && 
connectionState.host().equals(host))}}, if the Hostname changed, then called 
{{nodeState.put()}} to update the host.

So from above, it looks like the 1.1.x caching logic introduced a bug of not 
updating the nodeState()'s host when that is changed (e..g host failure, swap 
with a different hostname, but use the same broker.id).





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-10-24 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17220243#comment-17220243
 ] 

GEORGE LI commented on KAFKA-8733:
--

for lossy clusters, setting  unclean.leader.election.enable=true will help.  we 
are also rolling out replica.lag.time.max.ms=3 . 

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)

2020-05-20 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17111858#comment-17111858
 ] 

GEORGE LI commented on KAFKA-8638:
--

[~hai_lin]

some of the recent activities about KIP-491 is in KAFKA-4084, where I made a 
patch for version 2.4 (and 1.1)  with an installation guide. 



> Preferred Leader Blacklist (deprioritized list)
> ---
>
> Key: KAFKA-8638
> URL: https://issues.apache.org/jira/browse/KAFKA-8638
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, controller, core
>Affects Versions: 1.1.1, 2.3.0, 2.2.1
>Reporter: GEORGE LI
>Assignee: GEORGE LI
>Priority: Major
>
> Currently, the kafka preferred leader election will pick the broker_id in the 
> topic/partition replica assignments in a priority order when the broker is in 
> ISR. The preferred leader is the broker id in the first position of replica. 
> There are use-cases that, even the first broker in the replica assignment is 
> in ISR, there is a need for it to be moved to the end of ordering (lowest 
> priority) when deciding leadership during preferred leader election.
> Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
> leader. When preferred leadership is run, it will pick 1 as the leader if 
> it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, 
> then pick 3 as the leader. There are use cases that, even 1 is in ISR, we 
> would like it to be moved to the end of ordering (lowest priority) when 
> deciding leadership during preferred leader election. Below is a list of use 
> cases:
>  * (If broker_id 1 is a swapped failed host and brought up with last segments 
> or latest offset without historical data (There is another effort on this), 
> it's better for it to not serve leadership till it's caught-up.
>  * The cross-data center cluster has AWS instances which have less computing 
> power than the on-prem bare metal machines. We could put the AWS broker_ids 
> in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, 
> without changing the reassignments ordering of the replicas.
>  * If the broker_id 1 is constantly losing leadership after some time: 
> "Flapping". we would want to exclude 1 to be a leader unless all other 
> brokers of this topic/partition are offline. The “Flapping” effect was seen 
> in the past when 2 or more brokers were bad, when they lost leadership 
> constantly/quickly, the sets of partition replicas they belong to will see 
> leadership constantly changing. The ultimate solution is to swap these bad 
> hosts. But for quick mitigation, we can also put the bad hosts in the 
> Preferred Leader Blacklist to move the priority of its being elected as 
> leaders to the lowest.
>  * If the controller is busy serving an extra load of metadata requests and 
> other tasks. we would like to put the controller's leaders to other brokers 
> to lower its CPU load. currently bouncing to lose leadership would not work 
> for Controller, because after the bounce, the controller fails over to 
> another broker.
>  * Avoid bouncing broker in order to lose its leadership: it would be good if 
> we have a way to specify which broker should be excluded from serving 
> traffic/leadership (without changing the replica assignment ordering by 
> reassignments, even though that's quick), and run preferred leader election. 
> A bouncing broker will cause temporary URP, and sometimes other issues. Also 
> a bouncing of broker (e.g. broker_id 1) can temporarily lose all its 
> leadership, but if another broker (e.g. broker_id 2) fails or gets bounced, 
> some of its leaderships will likely failover to broker_id 1 on a replica with 
> 3 brokers. If broker_id 1 is in the blacklist, then in such a scenario even 
> broker_id 2 offline, the 3rd broker can take leadership.
> The current work-around of the above is to change the topic/partition's 
> replica reassignments to move the broker_id 1 from the first position to the 
> last position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). 
> This changes the replica reassignments, and we need to keep track of the 
> original one and restore if things change (e.g. controller fails over to 
> another broker, the swapped empty broker caught up). That’s a rather tedious 
> task.
> KIP is located at 
> [KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-05-08 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17103028#comment-17103028
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror]

hmm...That's weird. 

auto.leader.rebalance.enable seems to be functioning as it is meant.  need to 
make sure the controller has it set correctly.  

I wonder what the running config is.  could you this:

{code}
~/confluent-kafka-go$ git remote -v
origin  https://github.com/confluentinc/confluent-kafka-go.git (fetch)
origin  https://github.com/confluentinc/confluent-kafka-go.git (push)

~/confluent-kafka-go$ go run 
examples/admin_describe_config/admin_describe_config.go :9092 
broker   |grep auto
auto.leader.rebalance.enable = false
  STATIC_BROKER_CONFIG 
Read-only:true Sensitive:false
   auto.create.topics.enable = true 
  STATIC_BROKER_CONFIG 
Read-only:true Sensitive:false
{code}

The above  auto.leader.rebalance.enable = false is the real/actual config.  
some configs can be change dynamically while the process is running.  just want 
to make sure.  do it for all brokers.   

Another cause might be some cluster management software running (like cruise 
control),  that might be doing PLE periodically?  that will make te current 
leader = first replica when first replica is in ISR.



> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-04-19 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086772#comment-17086772
 ] 

GEORGE LI edited comment on KAFKA-4084 at 4/19/20, 7:28 AM:


[~blodsbror]

I am not very familiar with 5.4 setup. 

Do you have the error message of the crash in the log?  is it missing the 
zkclient jar like below? 

{code}
$ ls -l zk*.jar
-rw-r--r-- 1 georgeli engineering 74589 Nov 18 18:21 zkclient-0.11.jar
$ jar tvf zkclient-0.11.jar 
 0 Mon Nov 18 18:11:58 UTC 2019 META-INF/
  1135 Mon Nov 18 18:11:58 UTC 2019 META-INF/MANIFEST.MF
 0 Mon Nov 18 18:11:58 UTC 2019 org/
 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/
 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/
  3486 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ContentWatcher.class
   263 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/DataUpdater.class
{code}

If this jar file was there before, please copy it back.   I need to find out 
why it was missing after the build.  maybe some dependency setup in gradle.  I 
have also update the [install doc 
|https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit]
 using `./gradew clean build -x test` 

Also make sure the startup script for kafka is not hard coding 5.4 jars,  but 
take the jars from the lib classpath?  e.g.

{code}
/usr/lib/jvm/java-8-openjdk-amd64/bin/java 
-Dlog4j.configuration=file:/etc/kafka/log4j.xml -Xms22G -Xmx22G -XX:+UseG1GC 
-XX:MaxGCPauseMillis=20 -XX:NewSize=16G -XX:MaxNewSize=16G 
-XX:InitiatingHeapOccupancyPercent=3 -XX:G1MixedGCCountTarget=1 
-XX:G1HeapWastePercent=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
-XX:+PrintGCDateStamps -verbose:gc -Xloggc:/var/log/kafka/gc-kafka.log -server 
-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.port=29010 
-Djava.rmi.server.hostname=kafka12345-dca4 -cp '.:/usr/share/kafka/lib/*' 
kafka.Kafka /etc/kafka/server.properties
{code}


If you give us more details, we can help more. 

Thanks


Actually,  I just patched and added back zkclient libs for the gradle build.  
Please "git clone https://github.com/sql888/kafka.git; (or git pull)   and try 
to build again.  I suspect that was the issue.   Otherwise, we need to see the 
errors of the crash from the kafka logs. 




was (Author: sql_consulting):
[~blodsbror]

I am not very familiar with 5.4 setup. 

Do you have the error message of the crash in the log?  is it missing the 
zkclient jar like below? 

{code}
$ ls -l zk*.jar
-rw-r--r-- 1 georgeli engineering 74589 Nov 18 18:21 zkclient-0.11.jar
$ jar tvf zkclient-0.11.jar 
 0 Mon Nov 18 18:11:58 UTC 2019 META-INF/
  1135 Mon Nov 18 18:11:58 UTC 2019 META-INF/MANIFEST.MF
 0 Mon Nov 18 18:11:58 UTC 2019 org/
 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/
 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/
  3486 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ContentWatcher.class
   263 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/DataUpdater.class
{code}

If this jar file was there before, please copy it back.   I need to find out 
why it was missing after the build.  maybe some dependency setup in gradle.  I 
have also update the [install doc 
|https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit]
 using `./gradew clean build -x test` 

Also make sure the startup script for kafka is not hard coding 5.4 jars,  but 
take the jars from the lib classpath?  e.g.

{code}
/usr/lib/jvm/java-8-openjdk-amd64/bin/java 
-Dlog4j.configuration=file:/etc/kafka/log4j.xml -Xms22G -Xmx22G -XX:+UseG1GC 
-XX:MaxGCPauseMillis=20 -XX:NewSize=16G -XX:MaxNewSize=16G 
-XX:InitiatingHeapOccupancyPercent=3 -XX:G1MixedGCCountTarget=1 
-XX:G1HeapWastePercent=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
-XX:+PrintGCDateStamps -verbose:gc -Xloggc:/var/log/kafka/gc-kafka.log -server 
-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.port=29010 
-Djava.rmi.server.hostname=kafka12345-dca4 -cp '.:/usr/share/kafka/lib/*' 
kafka.Kafka /etc/kafka/server.properties
{code}


If you give us more details, we can help more. 

Thanks


Actually,  I just patched and added back zkclient libs for the gradle build.  
Please "git clone https://github.com/sql888/kafka.git; and try to build again.  
I suspect that was the issue.   Otherwise, we need to see the errors of the 
crash from the kafka logs. 



> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
> 

[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-04-19 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086772#comment-17086772
 ] 

GEORGE LI edited comment on KAFKA-4084 at 4/19/20, 7:20 AM:


[~blodsbror]

I am not very familiar with 5.4 setup. 

Do you have the error message of the crash in the log?  is it missing the 
zkclient jar like below? 

{code}
$ ls -l zk*.jar
-rw-r--r-- 1 georgeli engineering 74589 Nov 18 18:21 zkclient-0.11.jar
$ jar tvf zkclient-0.11.jar 
 0 Mon Nov 18 18:11:58 UTC 2019 META-INF/
  1135 Mon Nov 18 18:11:58 UTC 2019 META-INF/MANIFEST.MF
 0 Mon Nov 18 18:11:58 UTC 2019 org/
 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/
 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/
  3486 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ContentWatcher.class
   263 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/DataUpdater.class
{code}

If this jar file was there before, please copy it back.   I need to find out 
why it was missing after the build.  maybe some dependency setup in gradle.  I 
have also update the [install doc 
|https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit]
 using `./gradew clean build -x test` 

Also make sure the startup script for kafka is not hard coding 5.4 jars,  but 
take the jars from the lib classpath?  e.g.

{code}
/usr/lib/jvm/java-8-openjdk-amd64/bin/java 
-Dlog4j.configuration=file:/etc/kafka/log4j.xml -Xms22G -Xmx22G -XX:+UseG1GC 
-XX:MaxGCPauseMillis=20 -XX:NewSize=16G -XX:MaxNewSize=16G 
-XX:InitiatingHeapOccupancyPercent=3 -XX:G1MixedGCCountTarget=1 
-XX:G1HeapWastePercent=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
-XX:+PrintGCDateStamps -verbose:gc -Xloggc:/var/log/kafka/gc-kafka.log -server 
-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.port=29010 
-Djava.rmi.server.hostname=kafka12345-dca4 -cp '.:/usr/share/kafka/lib/*' 
kafka.Kafka /etc/kafka/server.properties
{code}


If you give us more details, we can help more. 

Thanks


Actually,  I just patched and added back zkclient libs for the gradle build.  
Please "git clone https://github.com/sql888/kafka.git; and try to build again.  
I suspect that was the issue.   Otherwise, we need to see the errors of the 
crash from the kafka logs. 




was (Author: sql_consulting):
[~blodsbror]

I am not very familiar with 5.4 setup. 

Do you have the error message of the crash in the log?  is it missing the 
zkclient jar like below? 

{code}
$ ls -l zk*.jar
-rw-r--r-- 1 georgeli engineering 74589 Nov 18 18:21 zkclient-0.11.jar
$ jar tvf zkclient-0.11.jar 
 0 Mon Nov 18 18:11:58 UTC 2019 META-INF/
  1135 Mon Nov 18 18:11:58 UTC 2019 META-INF/MANIFEST.MF
 0 Mon Nov 18 18:11:58 UTC 2019 org/
 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/
 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/
  3486 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ContentWatcher.class
   263 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/DataUpdater.class
{code}

If this jar file was there before, please copy it back.   I need to find out 
why it was missing after the build.  maybe some dependency setup in gradle.  I 
have also update the [install doc 
|https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit]
 using `./gradew clean build -x test` 

Also make sure the startup script for kafka is not hard coding 5.4 jars,  but 
take the jars from the lib classpath?  e.g.

{code}
/usr/lib/jvm/java-8-openjdk-amd64/bin/java 
-Dlog4j.configuration=file:/etc/kafka/log4j.xml -Xms22G -Xmx22G -XX:+UseG1GC 
-XX:MaxGCPauseMillis=20 -XX:NewSize=16G -XX:MaxNewSize=16G 
-XX:InitiatingHeapOccupancyPercent=3 -XX:G1MixedGCCountTarget=1 
-XX:G1HeapWastePercent=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
-XX:+PrintGCDateStamps -verbose:gc -Xloggc:/var/log/kafka/gc-kafka.log -server 
-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.port=29010 
-Djava.rmi.server.hostname=kafka12345-dca4 -cp '.:/usr/share/kafka/lib/*' 
kafka.Kafka /etc/kafka/server.properties
{code}


If you give us more details, we can help more. 

Thanks

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} 

[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-04-19 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086772#comment-17086772
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror]

I am not very familiar with 5.4 setup. 

Do you have the error message of the crash in the log?  is it missing the 
zkclient jar like below? 

{code}
$ ls -l zk*.jar
-rw-r--r-- 1 georgeli engineering 74589 Nov 18 18:21 zkclient-0.11.jar
$ jar tvf zkclient-0.11.jar 
 0 Mon Nov 18 18:11:58 UTC 2019 META-INF/
  1135 Mon Nov 18 18:11:58 UTC 2019 META-INF/MANIFEST.MF
 0 Mon Nov 18 18:11:58 UTC 2019 org/
 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/
 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/
  3486 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ContentWatcher.class
   263 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/DataUpdater.class
{code}

If this jar file was there before, please copy it back.   I need to find out 
why it was missing after the build.  maybe some dependency setup in gradle.  I 
have also update the [install doc 
|https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit]
 using `./gradew clean build -x test` 

Also make sure the startup script for kafka is not hard coding 5.4 jars,  but 
take the jars from the lib classpath?  e.g.

{code}
/usr/lib/jvm/java-8-openjdk-amd64/bin/java 
-Dlog4j.configuration=file:/etc/kafka/log4j.xml -Xms22G -Xmx22G -XX:+UseG1GC 
-XX:MaxGCPauseMillis=20 -XX:NewSize=16G -XX:MaxNewSize=16G 
-XX:InitiatingHeapOccupancyPercent=3 -XX:G1MixedGCCountTarget=1 
-XX:G1HeapWastePercent=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
-XX:+PrintGCDateStamps -verbose:gc -Xloggc:/var/log/kafka/gc-kafka.log -server 
-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.port=29010 
-Djava.rmi.server.hostname=kafka12345-dca4 -cp '.:/usr/share/kafka/lib/*' 
kafka.Kafka /etc/kafka/server.properties
{code}


If you give us more details, we can help more. 

Thanks

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-04-08 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077958#comment-17077958
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror]

Probably doing PLE with too many partitions at once is not good.  We have 
scripted to take all  partition with Preferred Leader Imbalance.  (e.g.  
current leader != first replica).   and the first replica is in ISR.  

Then we divide it  batches (e.g.  100 partitions per batch. and throttle sleep 
about 5-10 seconds) between each batch.   We also verify each batch after 
submitting for PLE.   e.g.  the ZK node.  
//admin/preferred_replica_election  is gone.  


for KIP-491 patch,  maybe I should write a wrapper for doing PLE,   because now 
the logic is not just.  current_leader != first replica.  but:  current_leader 
!=   


The batch logic is basically  writing the topic/partitions into a Json file 
(e.g. 100 per batch), and the submit that batch using the open source script 
`kafka-preferred-replica-election.sh` ,  below is shell script to do PLE for 
one topic (all partitions).  It's still using ZK to submit the json,  can 
change to --bootstrap-server 


{code}
$ cat topic_preferred_leader_election.sh
.
name=$1
topic=$2
kafka_cluster_name="${name}"
zk=$(kafka_zk_lookup ${kafka_cluster_name})
json_filename="${name}_${topic}_leader_election.json"

touch ${json_filename}

echo "{\"partitions\":[" >${json_filename}
IFS=$'\n'
for partition in `/usr/lib/kafka/bin/kafka-run-class.sh 
kafka.admin.TopicCommand --zookeeper $zk --describe --topic $topic  2>/dev/null 
|grep Partition:|awk -F "Partition:" '{print $2}'|awk '{print $1}'`
do
  if [ "$partition" == "0" ]
  then
echo " {\"topic\": \"${topic}\", \"partition\": ${partition}}" 
>>${json_filename}
  else
echo ",{\"topic\": \"${topic}\", \"partition\": ${partition}}" 
>>${json_filename}
  fi
done

echo "]}" >>${json_filename}

/usr/lib/kafka/bin/kafka-preferred-replica-election.sh --zookeeper $zk 
--path-to-json-file ${json_filename} 2>/dev/null
#rm ${json_filename}
{code}


for the troubleshooting of the timeout,  maybe check the ZK node:  
//admin/preferred_replica_election and see any pending PLE there. 
maybe because of the KIP-491 Preferred Leader deprioritized/black list?  I 
doubt, because I have tested it worked.   does this PLE work before applying 
KIP-491 patch? 

I think Zookeeper node has size limit of 1MB.   so 5000-6000 partitions doing 
PLE all together in one batch might not work.   How about trying one topic 
first, then try 100 in a batch?

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-04-06 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17076124#comment-17076124
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror] 

If this turns out to be positive in the testing.   I can restart the discussion 
on the dev mailing list for KIP-491.at least it works/helps with 
auto.leader.rebalance.enable=true. 

There are other use cases listed in  KIP-491.   e.g.   when controller is busy 
with metadata request,  can set this dynamic config for the controller, run 
PLE, and controller will give up all its leadership, just as a follower,  CPU 
usage  down. 10-15%, making it light-weighted doing its work,  no need to 
bounce the controller.   I know some company is working on  the feature of 
separating the controller to another set of machines.  


Our primary use case of this  `leader.deprioritized.list=`  feature 
is  bundled together with another feature call  
replica.start.offlet.strategy=latest , which I have not filed for a KIP ,  
(default  is earliest like current  kafka behavior),  this is also a dynamic 
config.   can be set for broker level (or global cluster).  What it does is 
when a broker failed and lost all its local disk,  and replaced with an empty 
broker,  the empty broker will need to start replication from earliest offset 
by default,  for us,  this could be 20TB+ of data for a few hours and can cause 
outages if not throttled properly. So just like the kafka consumer,  we 
introduce dynamic config replica.start.offlet.strategy=latest ,  to just 
replicate from each partition leader's  latest offset.   Once it's caught up 
(URP=> 0 for this broker) usually in 5-10minutes or sooner, then remove the 
dynamic config,   Because this broker does not have all the historical data,  
it should not be serving leaderships.  That's how the  KIP-491. 
`leader.deprioritized.list=` is coming into play.The automation 
software will calculate the  retention time at the broker and topic level, take 
the Max,  and once the broker is in replication for that amount of time (e.g.  
6 hours,  1 day,  3days, whatever,),  the automation software will remove the 
leader.deprioritized.list dynamic config for the broker.  and run PLE to change 
the leadership back to it. 


> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-04-06 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17076089#comment-17076089
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror]

I think it's still ok to set auto.leader.rebalance.enable=true.  but for the 
broker that had failed and replaced coming up empty.  

There should be some kind of automation that first set the 
`leader.deprioritized.list=`  dynamic config at ''  cluster 
global level, so current controller and possible another failover controller 
can make it in effect immediately.  Then start the new replaced broker.  During 
the time the broker is busy catching up.   Because it's in the lower priority 
for being considered for leaders,  the auto.leader.rebalance.enable=true will 
be sort of disabled automatically for this broker.  

After this broker catches up. e.g.  URP => 0,   CPU/Disk, etc. back to normal.  
 the dynamic config above can be removed by the automation script. and  with 
auto.leader.rebalance.enable=true, the leaders will  be auto going to its 
Preferred leader (first/head of the partition assignment) of this broker. 






> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-04-06 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17076006#comment-17076006
 ] 

GEORGE LI edited comment on KAFKA-4084 at 4/6/20, 7:13 AM:
---

[~blodsbror]

Have some free time this weekend to troubleshoot and found out after 1.1 , at 
least in 2.4,  the controller code has some optimization for PLE, not running 
PLE at all if  current_leader == Head of Replica.   That had cause my 
unit/integration tests to fail.  I have patched that as well.   I have landed 
my code change to  my repo's feature branch. 2.4-leader-deprioritized-list 
(based on the 2.4 branch)

detail installation and testing  steps in this [Google 
doc|https://docs.google.com/document/d/14vlPkbaog_5Xdd-HB4vMRaQQ7Fq4SlxddULsvc3PlbY/edit].
   Please let me know if you have issues with the patch/testing. If can not 
view the doc, please click the request access button. or send me your email to 
add to the share.   my email: sqlconsult...@gmail.com

Please keep us posted with your testing results. 

Thanks,
George



was (Author: sql_consulting):
[~blodsbror]

Have some free time this weekend to troubleshoot and found out after 1.1 , at 
least in 2.4,  the controller code has some optimization for PLE, not running 
PLE at all if  current_leader == Head of Replica.   That had cause my 
unit/integration tests to fail.  I have patched that as well.   I have landed 
my code change to  my repo's feature branch. 2.4-leader-deprioritized-list 
(based on the 2.4 branch)

detail installation and testing  steps in this [Google 
doc|https://docs.google.com/document/d/1ZuOcYTSuCAqCut_hjI_EY3lA9W7BuIlHVUdOUcSpWww/edit].
   Please let me know if you have issues with the patch/testing. If can not 
view the doc, please click the request access button. or send me your email to 
add to the share.   my email: sqlconsult...@gmail.com

Please keep us posted with your testing results. 

Thanks,
George


> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-04-05 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17076006#comment-17076006
 ] 

GEORGE LI edited comment on KAFKA-4084 at 4/6/20, 2:26 AM:
---

[~blodsbror]

Have some free time this weekend to troubleshoot and found out after 1.1 , at 
least in 2.4,  the controller code has some optimization for PLE, not running 
PLE at all if  current_leader == Head of Replica.   That had cause my 
unit/integration tests to fail.  I have patched that as well.   I have landed 
my code change to  my repo's feature branch. 2.4-leader-deprioritized-list 
(based on the 2.4 branch)

detail installation and testing  steps in this [Google 
doc|https://docs.google.com/document/d/1ZuOcYTSuCAqCut_hjI_EY3lA9W7BuIlHVUdOUcSpWww/edit].
   Please let me know if you have issues with the patch/testing. If can not 
view the doc, please click the request access button. or send me your email to 
add to the share.   my email: sqlconsult...@gmail.com

Please keep us posted with your testing results. 

Thanks,
George



was (Author: sql_consulting):
[~blodsbror]

Have some free time this weekend to troubleshoot and found out after 1.1 , at 
least in 2.4,  the controller code has some optimization for PLE, not running 
PLE at all if  current_leader == Head of Replica.   That had cause my 
unit/integration tests to fail.  I have patched that as well.   I have landed 
my code change to  my repo's feature branch. 2.4-leader-deprioritized-list 
(based on the 2.4 branch)

detail installation and testing  steps in this [Google 
doc|https://docs.google.com/document/d/1ZuOcYTSuCAqCut_hjI_EY3lA9W7BuIlHVUdOUcSpWww/edit].
   Please let me know if you have issues with the patch/testing. If can not 
view the doc, please click the request access button. or send me your email to 
add to the share.   my email: sqlconsult...@gmail.com

Please keep us posted with your testing results. 

Thanks,
George

If you 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-04-05 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17076006#comment-17076006
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror]

Have some free time this weekend to troubleshoot and found out after 1.1 , at 
least in 2.4,  the controller code has some optimization for PLE, not running 
PLE at all if  current_leader == Head of Replica.   That had cause my 
unit/integration tests to fail.  I have patched that as well.   I have landed 
my code change to  my repo's feature branch. 2.4-leader-deprioritized-list 
(based on the 2.4 branch)

detail installation and testing  steps in this [Google 
doc|https://docs.google.com/document/d/1ZuOcYTSuCAqCut_hjI_EY3lA9W7BuIlHVUdOUcSpWww/edit].
   Please let me know if you have issues with the patch/testing. If can not 
view the doc, please click the request access button. or send me your email to 
add to the share.   my email: sqlconsult...@gmail.com

Please keep us posted with your testing results. 

Thanks,
George

If you 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-03-31 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072259#comment-17072259
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror]

Very busy with work and this COVID-19 Jira work-from-home ticket.  :)  Last 
time I worked on it I had some issues with some Unit Tests passing after 
porting from 1.1.x to 2.4.  Let me look into them again.  sorry for the delay. 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-03-04 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17051509#comment-17051509
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror]

sure.  I am working on porting the patch from 1.1.x. to 2.4.  Looks like some 
changes of separating PartitionStateMachine.scala to Election.scala.   I will 
need some time to modify that and corresponding unit/integration/system tests. 
will post the patch once ready

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-28 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17048116#comment-17048116
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror]
[~sriharsha]

Hi  Evan,   I am so sorry I had been busy with work and forget about providing 
the KIP-491  patch for Confluent 5.4.  just check the backlog emails.   Let me 
work on it this weekend. 



> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032598#comment-17032598
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror] 

Which kafka version you are running?   Maybe I can provide a diff for the 
KIP-491 changes  for you to apply at your end to try it out? 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032267#comment-17032267
 ] 

GEORGE LI edited comment on KAFKA-4084 at 2/7/20 10:02 AM:
---

[~blodsbror] 
[~junrao]
[~sriharsha] 

With `auto.leader.rebalance.enable=true`,  even with checking the broker is in 
ISR, then rebalance the leadership to this broker.  It might still impact the 
partition that this broker is serving leadership because the broker is still 
trying to catch up.  

[~blodsbror]  
Is your cluster  having lossless setting like min.insync.replicas > 1 ?   For 
lossless,  we experience high producer latency during  catchup or  
reassignments. The leader deprioritized list feature is convenient in this 
case. it will just put this broker catching up in the lowest priority when 
considering being the leader.   Another useful case is when the current 
controlller is very busy with metadata request,  "blacklist" it, and only 
serving as followers can give 10-15% CPU back to the controller (without 
bouncing it). 

In a cluster without any down brokers,  no URP (Under Replicated Partitions),   
there is a workaround to  run reassignments to move that broker to the end of 
the partition assignment,  e.g.  broker_id 100 is down.then partition 
assignment (100, 101, 102) => (101, 102, 100).  the reassignment should 
complete fast because all replicas in ISR.  then run preferred leader election 
will change the leader from 100 => 101.   The downside is:  its more work to 
rollback or rebalance again.KIP-491 is very easy to rollback, just unset 
the dynamic config.

In the case of the cluster with URP,  the reassignment approach to move the 
broker to the end of the assignment might not work, because the broker is not 
in ISR. the reassignments will be pending till it catches up and in ISR. 

So maybe we do have a good use-case for KIP-491 ?





was (Author: sql_consulting):
[~blodsbror] [~junrao][~sriharsha] 

With `auto.leader.rebalance.enable=true`,  even with checking the broker is in 
ISR, then rebalance the leadership to this broker.  It might still impact the 
partition that this broker is serving leadership because the broker is still 
trying to catch up.  

[~blodsbror]  is your cluster  having lossless setting like min.insync.replicas 
> 1 ?   For lossless,  we experience high producer latency during  catchup or  
reassignments. The leader deprioritized list feature is convenient in this 
case. it will just put this broker catching up in the lowest priority when 
considering being the leader.   Another useful case is when the current 
controlller is very busy with metadata request,  "blacklist" it, and only 
serving as followers can give 10-15% CPU back to the controller (without 
bouncing it). 

In a cluster without any down brokers,  no URP (Under Replicated Partitions),   
there is a workaround to  run reassignments to move that broker to the end of 
the partition assignment,  e.g.  broker_id 100 is down.then partition 
assignment (100, 101, 102) => (101, 102, 100).  the reassignment should 
complete fast because all replicas in ISR.  then run preferred leader election 
will change the leader from 100 => 101.   The downside is:  its more work to 
rollback or rebalance again.KIP-491 is very easy to rollback, just unset 
the dynamic config.

In the case of the cluster with URP,  the reassignment approach to move the 
broker to the end of the assignment might not work, because the broker is not 
in ISR. the reassignments will be pending till it catches up and in ISR. 

So maybe we do have a good use-case for KIP-491 ?




> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. 

[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032267#comment-17032267
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror] [~junrao][~sriharsha] 

With `auto.leader.rebalance.enable=true`,  even with checking the broker is in 
ISR, then rebalance the leadership to this broker.  It might still impact the 
partition that this broker is serving leadership because the broker is still 
trying to catch up.  

[~blodsbror]  is your cluster  having lossless setting like min.insync.replicas 
> 1 ?   For lossless,  we experience high producer latency during  catchup or  
reassignments. The leader deprioritized list feature is convenient in this 
case. it will just put this broker catching up in the lowest priority when 
considering being the leader.   Another useful case is when the current 
controlller is very busy with metadata request,  "blacklist" it, and only 
serving as followers can give 10-15% CPU back to the controller (without 
bouncing it). 

In a cluster without any down brokers,  no URP (Under Replicated Partitions),   
there is a workaround to  run reassignments to move that broker to the end of 
the partition assignment,  e.g.  broker_id 100 is down.then partition 
assignment (100, 101, 102) => (101, 102, 100).  the reassignment should 
complete fast because all replicas in ISR.  then run preferred leader election 
will change the leader from 100 => 101.   The downside is:  its more work to 
rollback or rebalance again.KIP-491 is very easy to rollback, just unset 
the dynamic config.

In the case of the cluster with URP,  the reassignment approach to move the 
broker to the end of the assignment might not work, because the broker is not 
in ISR. the reassignments will be pending till it catches up and in ISR. 

So maybe we do have a good use-case for KIP-491 ?




> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-04 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17030183#comment-17030183
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~evanjpw]  

One solution we have in-house is a tooling to do "offline rebuild" of the 
failed empty broker.  The idea is to keep the kafka on the new empty broker 
down.  not serving any traffic.  and  copy the log segments/indexes files from 
other brokers in the cluster that this broker is supposed to be hosting.   
distribute and throttle the copy so it would not affect the brokers serving 
traffic in production.   Once the copy is done,  modify the replication offset 
meta file,  start up the kafka on the new node,  since there are historical 
logs already copied,  the "delta" catch-up from leaders will be small, and it 
will be faster to catch up.   

This is just the main idea.  It is an internal tooling that is tied to our 
infrastructure.  Hope this helps.


> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-03 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029299#comment-17029299
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror] We have implemented KIP-491 internally.   

Using a dynamic config leader.deprioritized.list to set it for the cluster 
global level  ('') so in case of controller failover, the new 
controller will inherit this dynamic config settings. 

{code}
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
--entity-default --alter --add-config leader.deprioritized.list=10001

$ zkcli -h zk_host1 get /kafka_cluster_name/config/brokers/''
('{"version":1,"config":{"leader.deprioritized.list":"10001"}}', 
ZnodeStat(czxid=25790129667, mzxid=25791108491, ctime=1572677228451, 
mtime=1580117760492, version=21, cversion=0, aversion=0, ephemeralOwner=0, 
dataLength=59, numChildren=0, pzxid=25790129667))
{code}

This will put the broker 10001 to the lowest priority when controller is 
considering leadership for that partition,  regardless this broker is in the 
1st position of the assignment (namely : preferred leader),  if this is 
currently serving leadership,  the preferred leader election will move it to 
another broker in the ISR. 

We have also implemented another feature separate from KIP-491 that when an 
empty broker is starting up,  a dynamic config for that broker called 
"{{replica.start.offset.strategy}}" is set to "latest" (default is "earliest" 
like current upstream behavior),   just like a consumer, it will fetch from 
current leaders'  latest offsets instead of  earliest(start) offset.  So this 
makes the failed empty broker coming up very fast.This feature is used 
together with KIP-491  {{leader.deprioritized.list}} to blacklist this broker 
to serve traffic (because it does not have enough data).  After it's in 
replication for sometime (retention of the broker/topic level),  this broker is 
completely caught-up, and the {{leader.deprioritized.list}}  is removed. and 
when preferred leader is run, this broker can serve traffic again.  We haven't 
proposed this in any KIP yet.  But I think this is also a good features. 

maybe I will restart the KIP-491 discussion in the dev mailing list.


> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-03 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029299#comment-17029299
 ] 

GEORGE LI edited comment on KAFKA-4084 at 2/3/20 9:26 PM:
--

[~blodsbror] We have implemented KIP-491 internally.   

Using a dynamic config leader.deprioritized.list to set it for the cluster 
global level  ('') so in case of controller failover, the new 
controller will inherit this dynamic config settings. 

{code}
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
--entity-default --alter --add-config leader.deprioritized.list=10001

$ zkcli -h zk_host1 get /kafka_cluster_name/config/brokers/''
('{"version":1,"config":{"leader.deprioritized.list":"10001"}}', 
ZnodeStat(czxid=25790129667, mzxid=25791108491, ctime=1572677228451, 
mtime=1580117760492, version=21, cversion=0, aversion=0, ephemeralOwner=0, 
dataLength=59, numChildren=0, pzxid=25790129667))
{code}

This will put the broker 10001 to the lowest priority when controller is 
considering leadership for that partition,  regardless this broker is in the 
1st position of the assignment (namely : preferred leader),  if this is 
currently serving leadership,  the preferred leader election will move it to 
another broker in the ISR. 

We have also implemented another feature separate from KIP-491 that when an 
empty broker is starting up,  a dynamic config for that broker called 
"{{replica.start.offset.strategy}}" is set to "latest" (default is "earliest" 
like current upstream behavior),   just like a consumer, it will fetch from 
current leaders'  latest offsets instead of  earliest(start) offset.  So this 
makes the failed empty broker coming up very fast.This feature is used 
together with KIP-491  {{leader.deprioritized.list}} to blacklist this broker 
to serve traffic (because it does not have enough data).  After it's in 
replication for sometime (retention of the broker/topic level),  this broker is 
completely caught-up, and the {{leader.deprioritized.list}}  is removed. and 
when preferred leader is run, this broker can serve traffic again.  We haven't 
proposed this in any KIP yet.  But I think this is also a good feature. 

maybe I will restart the KIP-491 discussion in the dev mailing list.



was (Author: sql_consulting):
[~blodsbror] We have implemented KIP-491 internally.   

Using a dynamic config leader.deprioritized.list to set it for the cluster 
global level  ('') so in case of controller failover, the new 
controller will inherit this dynamic config settings. 

{code}
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
--entity-default --alter --add-config leader.deprioritized.list=10001

$ zkcli -h zk_host1 get /kafka_cluster_name/config/brokers/''
('{"version":1,"config":{"leader.deprioritized.list":"10001"}}', 
ZnodeStat(czxid=25790129667, mzxid=25791108491, ctime=1572677228451, 
mtime=1580117760492, version=21, cversion=0, aversion=0, ephemeralOwner=0, 
dataLength=59, numChildren=0, pzxid=25790129667))
{code}

This will put the broker 10001 to the lowest priority when controller is 
considering leadership for that partition,  regardless this broker is in the 
1st position of the assignment (namely : preferred leader),  if this is 
currently serving leadership,  the preferred leader election will move it to 
another broker in the ISR. 

We have also implemented another feature separate from KIP-491 that when an 
empty broker is starting up,  a dynamic config for that broker called 
"{{replica.start.offset.strategy}}" is set to "latest" (default is "earliest" 
like current upstream behavior),   just like a consumer, it will fetch from 
current leaders'  latest offsets instead of  earliest(start) offset.  So this 
makes the failed empty broker coming up very fast.This feature is used 
together with KIP-491  {{leader.deprioritized.list}} to blacklist this broker 
to serve traffic (because it does not have enough data).  After it's in 
replication for sometime (retention of the broker/topic level),  this broker is 
completely caught-up, and the {{leader.deprioritized.list}}  is removed. and 
when preferred leader is run, this broker can serve traffic again.  We haven't 
proposed this in any KIP yet.  But I think this is also a good features. 

maybe I will restart the KIP-491 discussion in the dev mailing list.


> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: 

[jira] [Updated] (KAFKA-8903) allow the new replica (offset 0) to catch up with current leader using latest offset

2019-09-12 Thread GEORGE LI (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GEORGE LI updated KAFKA-8903:
-
Description: 
It very common (and sometimes frequent) that a broker has hardware failures 
(disk, memory, cpu, nic) for large Kafka deployment with thousands of brokers.  
The failed host will be replaced by a new one with the same "broker.id",  and 
the new broker starts up as empty.  All topic/partitions will start with offset 
0.  If the current leader has start offset > 0,  this replaced broker will 
start the partition from the leader's earliest (start) offset. 

If the number of partitions  and size of the partitions that this broker is 
hosting is high, it would take quite sometime for the ReplicaFetcher threads to 
pull from all the leaders in the cluster.  and it could incur load of the 
brokers/leaders in the cluster affecting Latency, etc.  performance.   Once 
this replaced broker is caught up,  Preferred leader election can be run to 
move the leaders back to this broker. 

To avoid above performance impact and make the failed broker replacement 
process much easier and scalable,  we are proposing a new Dynamic config 
_replica.start.offset.strategy_.  The default is Earliest, and can be 
dynamically set for a broker (or brokers) to Latest.  If it's set to Latest,  
when the empty broker is starting up, all partitions will be starting from 
latest (LEO LogEndOffset) of the current leader.  So the replace broker 
replicas are in ISR and have 0 TotalLag/MaxLag, 0 URP almost instantly. 

For preferred leadership election, we can wait till the retention time has 
passed, and this replaced broker is in the replication for enough time.  The 
better/safer approach is enable Preferred Leader Blacklist  mentioned in  
KAFKA-8638 /  
[KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982]
  ,  so before this replaced broker is completely caught up,  it's leadership 
determination priority is moved to the lowest. 










  was:
It very common (and sometimes frequent) that a broker has hardware failures 
(disk, memory, cpu, nic) for large Kafka deployment with thousands of brokers.  
The failed host will be replaced by a new one with the same "broker.id",  and 
the new broker starts up as empty.  All topic/partitions will start with offset 
0.  If the current leader has start offset > 0,  this replaced broker will 
start the partition from the leader's earliest (start) offset. 

If the number of partitions  and size of the partitions that this broker is 
hosting is high, it would take quite sometime for the ReplicaFetcher threads to 
pull from all the leaders in the cluster.  and it could incur load of the 
brokers/leaders in the cluster affecting Latency, etc.  performance.   Once 
this replaced broker is caught up,  Preferred leader election can be run to 
move the leaders back to this broker. 

To avoid above performance impact and make the failed broker replacement 
process much easier and scalable,  we are proposing a new Dynamic config {{ 
replica.start.offset.strategy}}.  The default is Earliest, and can be 
dynamically set for a broker (or brokers) to Latest.  If it's set to Latest,  
when the empty broker is starting up, all partitions will be starting from 
latest (LEO LogEndOffset) of the current leader.  So the replace broker 
replicas are in ISR and have 0 TotalLag/MaxLag, 0 URP almost instantly. 

For preferred leadership election, we can wait till the retention time has 
passed, and this replaced broker is in the replication for enough time.  The 
better/safer approach is enable Preferred Leader Blacklist  mentioned in  
KAFKA-8638 /  
[KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982]
  ,  so before this replaced broker is completely caught up,  it's leadership 
determination priority is moved to the lowest. 











> allow the new replica (offset 0) to catch up with current leader using latest 
> offset
> 
>
> Key: KAFKA-8903
> URL: https://issues.apache.org/jira/browse/KAFKA-8903
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, core
>Affects Versions: 1.1.0, 1.1.1, 2.3.0
>Reporter: GEORGE LI
>Assignee: GEORGE LI
>Priority: Major
>
> It very common (and sometimes frequent) that a broker has hardware failures 
> (disk, memory, cpu, nic) for large Kafka deployment with thousands of 
> brokers.  The failed host will be replaced by a new one with the same 
> "broker.id",  and the new broker starts up as empty.  All topic/partitions 
> will start with offset 0.  If the current leader has start offset > 0,  this 
> replaced broker will start the partition from the leader's earliest (start) 
> offset. 
> If the number of partitions  and 

[jira] [Updated] (KAFKA-8903) allow the new replica (offset 0) to catch up with current leader using latest offset

2019-09-12 Thread GEORGE LI (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GEORGE LI updated KAFKA-8903:
-
Description: 
It very common (and sometimes frequent) that a broker has hardware failures 
(disk, memory, cpu, nic) for large Kafka deployment with thousands of brokers.  
The failed host will be replaced by a new one with the same "broker.id",  and 
the new broker starts up as empty.  All topic/partitions will start with offset 
0.  If the current leader has start offset > 0,  this replaced broker will 
start the partition from the leader's earliest (start) offset. 

If the number of partitions  and size of the partitions that this broker is 
hosting is high, it would take quite sometime for the ReplicaFetcher threads to 
pull from all the leaders in the cluster.  and it could incur load of the 
brokers/leaders in the cluster affecting Latency, etc.  performance.   Once 
this replaced broker is caught up,  Preferred leader election can be run to 
move the leaders back to this broker. 

To avoid above performance impact and make the failed broker replacement 
process much easier and scalable,  we are proposing a new Dynamic config {{ 
replica.start.offset.strategy}}.  The default is Earliest, and can be 
dynamically set for a broker (or brokers) to Latest.  If it's set to Latest,  
when the empty broker is starting up, all partitions will be starting from 
latest (LEO LogEndOffset) of the current leader.  So the replace broker 
replicas are in ISR and have 0 TotalLag/MaxLag, 0 URP almost instantly. 

For preferred leadership election, we can wait till the retention time has 
passed, and this replaced broker is in the replication for enough time.  The 
better/safer approach is enable Preferred Leader Blacklist  mentioned in  
KAFKA-8638 /  
[KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982]
  ,  so before this replaced broker is completely caught up,  it's leadership 
determination priority is moved to the lowest. 










  was:
It very common (and sometimes frequent) that a broker has hardware failures 
(disk, memory, cpu, nic) for large Kafka deployment with thousands of brokers.  
The failed host will be replaced by a new one with the same "broker.id",  and 
the new broker starts up as empty.  All topic/partitions will start with offset 
0.  If the current leader has start offset > 0,  this replaced broker will 
start the partition from the leader's earliest (start) offset. 

If the number of partitions  and size of the partitions that this broker is 
hosting is high, it would take quite sometime for the ReplicaFetcher threads to 
pull from all the leaders in the cluster.  and it could incur load of the 
brokers/leaders in the cluster affecting Latency, etc.  performance.   Once 
this replaced broker is caught up,  Preferred leader election can be run to 
move the leaders back to this broker. 

To avoid above performance impact and make the failed broker replacement 
process much easier and scalable,  we are proposing a new Dynamic config {{ 
replica.start.offset.strategy}}.  The default is Earliest, and can be 
dynamically set for a broker (or brokers) to Latest.  If it's set to Latest,  
when the empty broker is starting up, all partitions will be starting from 
latest (LEO LogEndOffset) of the current leader.  So the replace broker 
replicas are in ISR and have 0 TotalLag/MaxLag, 0 URP almost instantly. 

For preferred leadership election, we can wait till the retention time has 
passed, and this replaced broker is in the replication for enough time.  The 
better/safer approach is enable Preferred Leader Blacklist  mentioned in  
KAFKA-8638 /  KIP-491  ,  so before this replaced broker is completely caught 
up,  it's leadership determination priority is moved to the lowest. 











> allow the new replica (offset 0) to catch up with current leader using latest 
> offset
> 
>
> Key: KAFKA-8903
> URL: https://issues.apache.org/jira/browse/KAFKA-8903
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, core
>Affects Versions: 1.1.0, 1.1.1, 2.3.0
>Reporter: GEORGE LI
>Assignee: GEORGE LI
>Priority: Major
>
> It very common (and sometimes frequent) that a broker has hardware failures 
> (disk, memory, cpu, nic) for large Kafka deployment with thousands of 
> brokers.  The failed host will be replaced by a new one with the same 
> "broker.id",  and the new broker starts up as empty.  All topic/partitions 
> will start with offset 0.  If the current leader has start offset > 0,  this 
> replaced broker will start the partition from the leader's earliest (start) 
> offset. 
> If the number of partitions  and size of the partitions that this broker is 
> hosting is high, it would take 

[jira] [Commented] (KAFKA-8903) allow the new replica (offset 0) to catch up with current leader using latest offset

2019-09-12 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928906#comment-16928906
 ] 

GEORGE LI commented on KAFKA-8903:
--

e.g. 


{code:java}
/usr/lib/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter 
--add-config replica.start.offset.strategy=Latest --entity-type brokers 
--entity-name 1028
Completed updating config for broker: 1028.

[2019-09-08 06:48:34,997] 1307 [/config/changes-event-process-thread] INFO  
kafka.server.DynamicConfigManager  - Processing override for entityPath: 
brokers/1028 with config: Map(replica.start.offset.strategy -> Latest)
..

[2019-09-08 07:34:31,826] 1777 [ReplicaFetcherThread-1-1025] INFO  
kafka.server.ReplicaFetcherThread  - [ReplicaFetcher replicaId=1028, 
leaderId=1025, fetcherId=1] brokerConfig.ReplicaStartOffsetStrategy: Latest
[2019-09-08 07:34:31,826] 1777 [ReplicaFetcherThread-1-1025] INFO  
kafka.server.ReplicaFetcherThread  - [ReplicaFetcher replicaId=1028, 
leaderId=1025, fetcherId=1] replica.logEndOffset.messageOffset: 0
[2019-09-08 07:34:31,826] 1777 [ReplicaFetcherThread-1-1025] WARN  
kafka.server.ReplicaFetcherThread  - [ReplicaFetcher replicaId=1028, 
leaderId=1025, fetcherId=1] replica.start.offset.strategy: Latest. Reset fetch 
offset for partition georgeli_test-0 from 0 to current leader's latest offset 
339347
...
[2019-09-08 07:34:31,826] 1777 [ReplicaFetcherThread-1-1025] INFO  
kafka.log.LogCleaner  - The cleaning for partition georgeli_test-0 is aborted 
and paused
[2019-09-08 07:34:31,826] 1777 [ReplicaFetcherThread-1-1025] INFO  
kafka.log.Log  - [Log partition=georgeli_test-0, dir=/var/kafka-spool/data] 
Scheduling log segment [baseOffset 0, size 0] for deletion.
...
[2019-09-08 07:34:31,828] 1779 [ReplicaFetcherThread-1-1025] INFO  
kafka.log.LogCleaner  - Compaction for partition georgeli_test-0 is resumed
...
{code}



To remove the config and use default  "Earliest"  leader start offset
{code:java}
/usr/lib/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter 
--delete-config replica.start.offset.strategy --entity-type brokers 
--entity-name 1028
Completed updating config for broker: 1028.

[2019-09-08 23:18:20,581] 468051 [/config/changes-event-process-thread] INFO  
kafka.common.ZkNodeChangeNotificationListener  - Processing notification(s) to 
/config/changes
[2019-09-08 23:18:20,588] 468058 [/config/changes-event-process-thread] INFO  
kafka.server.DynamicConfigManager  - Processing override for entityPath: 
brokers/1028 with config: Map()
[2019-09-08 23:18:20,589] 468059 [/config/changes-event-process-thread] INFO  
kafka.server.KafkaConfig  - KafkaConfig values: 
advertised.host.name = kafka1028-dc3
...
replica.start.offset.strategy = Earliest
...

[2019-09-08 23:23:36,408] 1773 [ReplicaFetcherThread-1-1025] INFO  
kafka.log.Log  - [Log partition=georgeli_test-0, dir=/var/kafka-spool/data] 
Truncating to 0 has no effect as the largest offset in the log is -1
[2019-09-08 23:23:36,439] 1804 [ReplicaFetcherThread-1-1025] WARN  
kafka.server.ReplicaFetcherThread  - [ReplicaFetcher replicaId=1028, 
leaderId=1025, fetcherId=1] Reset fetch offset for partition georgeli_test-0 
from 0 to current leader's start offset 319246
[2019-09-08 23:23:36,440] 1805 [ReplicaFetcherThread-1-1025] INFO  
kafka.log.LogCleaner  - The cleaning for partition georgeli_test-0 is aborted 
and paused
[2019-09-08 23:23:36,440] 1805 [ReplicaFetcherThread-1-1025] INFO  
kafka.log.Log  - [Log partition=georgeli_test-0, dir=/var/kafka-spool/data] 
Scheduling log segment [baseOffset 0, size 0] for deletion.
[2019-09-08 23:23:36,441] 1806 [ReplicaFetcherThread-1-1025] INFO  
kafka.log.LogCleaner  - Compaction for partition georgeli_test-0 is resumed
[2019-09-08 23:23:36,449] 1814 [ReplicaFetcherThread-1-1025] INFO  
kafka.server.ReplicaFetcherThread  - [ReplicaFetcher replicaId=1028, 
leaderId=1025, fetcherId=1] Current offset 0 for partition georgeli_test-0 is 
out of range, which typically implies a leader change. Reset fetch offset to 
319246
...
{code}


>From above, we can see the new config "replica.start.offset.strategy"  
>Earliest/Latest  effect on the  empty/new partition's start offset.  319246 
>Vs. 339347



> allow the new replica (offset 0) to catch up with current leader using latest 
> offset
> 
>
> Key: KAFKA-8903
> URL: https://issues.apache.org/jira/browse/KAFKA-8903
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, core
>Affects Versions: 1.1.0, 1.1.1, 2.3.0
>Reporter: GEORGE LI
>Assignee: GEORGE LI
>Priority: Major
>
> It very common (and sometimes frequent) that a broker has hardware failures 
> (disk, memory, cpu, nic) for large Kafka deployment with thousands of 
> brokers.  The failed 

[jira] [Created] (KAFKA-8903) allow the new replica (offset 0) to catch up with current leader using latest offset

2019-09-12 Thread GEORGE LI (Jira)
GEORGE LI created KAFKA-8903:


 Summary: allow the new replica (offset 0) to catch up with current 
leader using latest offset
 Key: KAFKA-8903
 URL: https://issues.apache.org/jira/browse/KAFKA-8903
 Project: Kafka
  Issue Type: Improvement
  Components: config, core
Affects Versions: 2.3.0, 1.1.1, 1.1.0
Reporter: GEORGE LI
Assignee: GEORGE LI


It very common (and sometimes frequent) that a broker has hardware failures 
(disk, memory, cpu, nic) for large Kafka deployment with thousands of brokers.  
The failed host will be replaced by a new one with the same "broker.id",  and 
the new broker starts up as empty.  All topic/partitions will start with offset 
0.  If the current leader has start offset > 0,  this replaced broker will 
start the partition from the leader's earliest (start) offset. 

If the number of partitions  and size of the partitions that this broker is 
hosting is high, it would take quite sometime for the ReplicaFetcher threads to 
pull from all the leaders in the cluster.  and it could incur load of the 
brokers/leaders in the cluster affecting Latency, etc.  performance.   Once 
this replaced broker is caught up,  Preferred leader election can be run to 
move the leaders back to this broker. 

To avoid above performance impact and make the failed broker replacement 
process much easier and scalable,  we are proposing a new Dynamic config {{ 
replica.start.offset.strategy}}.  The default is Earliest, and can be 
dynamically set for a broker (or brokers) to Latest.  If it's set to Latest,  
when the empty broker is starting up, all partitions will be starting from 
latest (LEO LogEndOffset) of the current leader.  So the replace broker 
replicas are in ISR and have 0 TotalLag/MaxLag, 0 URP almost instantly. 

For preferred leadership election, we can wait till the retention time has 
passed, and this replaced broker is in the replication for enough time.  The 
better/safer approach is enable Preferred Leader Blacklist  mentioned in  
KAFKA-8638 /  KIP-491  ,  so before this replaced broker is completely caught 
up,  it's leadership determination priority is moved to the lowest. 












--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2019-08-05 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16900461#comment-16900461
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~jiaxinye] does the patch from 
https://issues.apache.org/jira/browse/KAFKA-7299   help?   

any idea why step 6 has 100% CPU utilization?   is it doing heavy 
ReplicaFetcher pulling data to follower of this previous offline brokers?   If 
this is true,  then  
[KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982]
 might help by putting this broker to temporary preferred leader "blacklist",  
and once it's up and running stable with low CPU usage, then remove from the 
blacklist and then the auto leader rebalance can proceed. 



> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8725) Improve LogCleaner error handling when failing to grab the filthiest log

2019-08-02 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16899237#comment-16899237
 ] 

GEORGE LI commented on KAFKA-8725:
--

We saw similar issues.  After the thread die,  the "good" partitions also stops 
cleaning and accumulated backlog. 

One more improvement might be to restart the log cleaner thread without 
bouncing the broker.  The current Dynamic config  log.cleaner.threads seems to 
be able to start the thread only one-time. 



> Improve LogCleaner error handling when failing to grab the filthiest log
> 
>
> Key: KAFKA-8725
> URL: https://issues.apache.org/jira/browse/KAFKA-8725
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> https://issues.apache.org/jira/browse/KAFKA-7215 improved error handling in 
> the log cleaner with the goal of not having the whole thread die when an 
> exception happens, but rather mark the partition that caused it as 
> uncleanable and continue cleaning the error-free partitions.
> Unfortunately, the current code can still bubble up an exception and cause 
> the thread to die when an error happens before we can grab the filthiest log 
> and start cleaning it. At that point, we don't have a clear reference to the 
> log that caused the exception and chose to throw an IllegalStateException - 
> [https://github.com/apache/kafka/blob/39bcc8447c906506d63b8df156cf90174bbb8b78/core/src/main/scala/kafka/log/LogCleaner.scala#L346]
>  (as seen in https://issues.apache.org/jira/browse/KAFKA-8724)
> Essentially, exceptions in `grabFilthiestCompactedLog` still cause the thread 
> to die. This can be further improved by trying to catch what log caused the 
> exception in the aforementioned function



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8663) partition assignment would be better original_assignment + new_reassignment during reassignments

2019-07-19 Thread GEORGE LI (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GEORGE LI resolved KAFKA-8663.
--
Resolution: Won't Fix

looks like RAR + OAR  is required for  KIP-455 to preserve the targetReplicas 
exact ordering, and old replicas that need to be dropped is in the new 
removingReplicas.  of /brokers/topics/ zk node. 


> partition assignment would be better original_assignment + new_reassignment 
> during reassignments
> 
>
> Key: KAFKA-8663
> URL: https://issues.apache.org/jira/browse/KAFKA-8663
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 1.1.1, 2.3.0
>Reporter: GEORGE LI
>Priority: Minor
>
> From my observation/experience during reassignment,  the partition assignment 
> replica ordering gets changed.   because it's  OAR + RAR  (original replicas 
> + reassignment replicas)  set union. 
> However, it seems like the preferred leaders changed during the 
> reassignments.  Normally if there is no cluster preferred leader election,  
> the leader is still the old leader.  But if during the reassignments, there 
> is a leader election,  the leadership changes.  This caused some side 
> effects.  Let's look at this example.
> {code}
> Topic:georgeli_test   PartitionCount:8ReplicationFactor:3 Configs:
>   Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
> 1026,1028,1025Isr: 1026,1028,1025
> {code}
> reassignment  (1026,1028,1025) => (1027,1025,1028)
> {code}
> Topic:georgeli_test   PartitionCount:8ReplicationFactor:4 
> Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027
>   Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
> 1027,1025,1028,1026   Isr: 1026,1028,1025
> {code}
> Notice the above:   Leader remains 1026.   but Replicas: 1027,1025,1028,1026. 
>   If we run preferred leader election,  it will try 1027 first, then 1025.
> After  1027 is in ISR,  then the final assignment will be  (1027,1025,1028).  
>   
> My proposal for a minor improvement is to keep the original ordering replicas 
> during the reassignment (could be long for big topic/partitions).  and after 
> all replicas in ISR, then finally set the partition assignment to New 
> reassignment.  
> {code}
>   val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ 
> controllerContext.partitionReplicaAssignment(topicPartition)).toSet
>   //1. Update AR in ZK with OAR + RAR.
>   updateAssignedReplicasForPartition(topicPartition, 
> newAndOldReplicas.toSeq)
> {code} 
> above code changed to below to keep the original ordering first during 
> reassignment: 
> {code}
>   val newAndOldReplicas = 
> (controllerContext.partitionReplicaAssignment(topicPartition) ++ 
> reassignedPartitionContext.newReplicas).toSet
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)

2019-07-14 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16884787#comment-16884787
 ] 

GEORGE LI commented on KAFKA-8638:
--

Here is the KIP:  
[KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982]

> Preferred Leader Blacklist (deprioritized list)
> ---
>
> Key: KAFKA-8638
> URL: https://issues.apache.org/jira/browse/KAFKA-8638
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, controller, core
>Affects Versions: 1.1.1, 2.3.0, 2.2.1
>Reporter: GEORGE LI
>Assignee: GEORGE LI
>Priority: Major
>
> Currently, the kafka preferred leader election will pick the broker_id in the 
> topic/partition replica assignments in a priority order when the broker is in 
> ISR. The preferred leader is the broker id in the first position of replica. 
> There are use-cases that, even the first broker in the replica assignment is 
> in ISR, there is a need for it to be moved to the end of ordering (lowest 
> priority) when deciding leadership during  preferred leader election. 
> Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
> leader.  When preferred leadership is run, it will pick 1 as the leader if 
> it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, 
> then pick 3 as the leader. There are use cases that, even 1 is in ISR, we 
> would like it to be moved to the end of ordering (lowest priority) when 
> deciding leadership during preferred leader election.   Below is a list of 
> use cases:
> * (If broker_id 1 is a swapped failed host and brought up with last segments 
> or latest offset without historical data (There is another effort on this), 
> it's better for it to not serve leadership till it's caught-up.
> * The cross-data center cluster has AWS instances which have less computing 
> power than the on-prem bare metal machines.  We could put the AWS broker_ids 
> in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, 
> without changing the reassignments ordering of the replicas. 
> * If the broker_id 1 is constantly losing leadership after some time: 
> "Flapping". we would want to exclude 1 to be a leader unless all other 
> brokers of this topic/partition are offline.  The “Flapping” effect was seen 
> in the past when 2 or more brokers were bad, when they lost leadership 
> constantly/quickly, the sets of partition replicas they belong to will see 
> leadership constantly changing.  The ultimate solution is to swap these bad 
> hosts.  But for quick mitigation, we can also put the bad hosts in the 
> Preferred Leader Blacklist to move the priority of its being elected as 
> leaders to the lowest. 
> *  If the controller is busy serving an extra load of metadata requests and 
> other tasks. we would like to put the controller's leaders to other brokers 
> to lower its CPU load. currently bouncing to lose leadership would not work 
> for Controller, because after the bounce, the controller fails over to 
> another broker.
> * Avoid bouncing broker in order to lose its leadership: it would be good if 
> we have a way to specify which broker should be excluded from serving 
> traffic/leadership (without changing the replica assignment ordering by 
> reassignments, even though that's quick), and run preferred leader election.  
> A bouncing broker will cause temporary URP, and sometimes other issues.  Also 
> a bouncing of broker (e.g. broker_id 1) can temporarily lose all its 
> leadership, but if another broker (e.g. broker_id 2) fails or gets bounced, 
> some of its leaderships will likely failover to broker_id 1 on a replica with 
> 3 brokers.  If broker_id 1 is in the blacklist, then in such a scenario even 
> broker_id 2 offline,  the 3rd broker can take leadership. 
> The current work-around of the above is to change the topic/partition's 
> replica reassignments to move the broker_id 1 from the first position to the 
> last position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). 
> This changes the replica reassignments, and we need to keep track of the 
> original one and restore if things change (e.g. controller fails over to 
> another broker, the swapped empty broker caught up). That’s a rather tedious 
> task.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8663) partition assignment would be better original_assignment + new_reassignment during reassignments

2019-07-13 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16884526#comment-16884526
 ] 

GEORGE LI commented on KAFKA-8663:
--

As we can see from the original comments of the code:  

{code}
 //1. Update AR in ZK with OAR + RAR.
{code}

But in the actual implementation, it's doing:  RAR + OAR  instead (different 
ordering). 

> partition assignment would be better original_assignment + new_reassignment 
> during reassignments
> 
>
> Key: KAFKA-8663
> URL: https://issues.apache.org/jira/browse/KAFKA-8663
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 1.1.1, 2.3.0
>Reporter: GEORGE LI
>Priority: Minor
>
> From my observation/experience during reassignment,  the partition assignment 
> replica ordering gets changed.   because it's  OAR + RAR  (original replicas 
> + reassignment replicas)  set union. 
> However, it seems like the preferred leaders changed during the 
> reassignments.  Normally if there is no cluster preferred leader election,  
> the leader is still the old leader.  But if during the reassignments, there 
> is a leader election,  the leadership changes.  This caused some side 
> effects.  Let's look at this example.
> {code}
> Topic:georgeli_test   PartitionCount:8ReplicationFactor:3 Configs:
>   Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
> 1026,1028,1025Isr: 1026,1028,1025
> {code}
> reassignment  (1026,1028,1025) => (1027,1025,1028)
> {code}
> Topic:georgeli_test   PartitionCount:8ReplicationFactor:4 
> Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027
>   Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
> 1027,1025,1028,1026   Isr: 1026,1028,1025
> {code}
> Notice the above:   Leader remains 1026.   but Replicas: 1027,1025,1028,1026. 
>   If we run preferred leader election,  it will try 1027 first, then 1025.
> After  1027 is in ISR,  then the final assignment will be  (1027,1025,1028).  
>   
> My proposal for a minor improvement is to keep the original ordering replicas 
> during the reassignment (could be long for big topic/partitions).  and after 
> all replicas in ISR, then finally set the partition assignment to New 
> reassignment.  
> {code}
>   val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ 
> controllerContext.partitionReplicaAssignment(topicPartition)).toSet
>   //1. Update AR in ZK with OAR + RAR.
>   updateAssignedReplicasForPartition(topicPartition, 
> newAndOldReplicas.toSeq)
> {code} 
> above code changed to below to keep the original ordering first during 
> reassignment: 
> {code}
>   val newAndOldReplicas = 
> (controllerContext.partitionReplicaAssignment(topicPartition) ++ 
> reassignedPartitionContext.newReplicas).toSet
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8663) partition assignment would be better original_assignment + new_reassignment during reassignments

2019-07-13 Thread GEORGE LI (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GEORGE LI updated KAFKA-8663:
-
Description: 
>From my observation/experience during reassignment,  the partition assignment 
>replica ordering gets changed.   because it's  OAR + RAR  (original replicas + 
>reassignment replicas)  set union. 

However, it seems like the preferred leaders changed during the reassignments.  
Normally if there is no cluster preferred leader election,  the leader is still 
the old leader.  But if during the reassignments, there is a leader election,  
the leadership changes.  This caused some side effects.  Let's look at this 
example.

{code}
Topic:georgeli_test PartitionCount:8ReplicationFactor:3 Configs:
Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
1026,1028,1025Isr: 1026,1028,1025
{code}

reassignment  (1026,1028,1025) => (1027,1025,1028)

{code}
Topic:georgeli_test PartitionCount:8ReplicationFactor:4 
Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027
Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
1027,1025,1028,1026   Isr: 1026,1028,1025
{code}

Notice the above:   Leader remains 1026.   but Replicas: 1027,1025,1028,1026.   
If we run preferred leader election,  it will try 1027 first, then 1025.
After  1027 is in ISR,  then the final assignment will be  (1027,1025,1028).

My proposal for a minor improvement is to keep the original ordering replicas 
during the reassignment (could be long for big topic/partitions).  and after 
all replicas in ISR, then finally set the partition assignment to New 
reassignment.  

{code}
  val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ 
controllerContext.partitionReplicaAssignment(topicPartition)).toSet
  //1. Update AR in ZK with OAR + RAR.
  updateAssignedReplicasForPartition(topicPartition, 
newAndOldReplicas.toSeq)
{code} 

above code changed to below to keep the original ordering first during 
reassignment: 

{code}
  val newAndOldReplicas = 
(controllerContext.partitionReplicaAssignment(topicPartition) ++ 
reassignedPartitionContext.newReplicas).toSet
{code}

  was:
>From my observation/experience during reassignment,  the partition assignment 
>replica ordering gets changed.   because it's  OAR + RAR  (original replicas + 
>reassignment replicas)  set union. 

However, it seems like the preferred leaders changed during the reassignments.  
Normally if there is no cluster preferred leader election,  the leader is still 
the old leader.  But if during the reassignments, there is a leader election,  
the leadership changes.  This caused some side effects.  Let's look at this 
example.

{code}
Topic:georgeli_test PartitionCount:8ReplicationFactor:3 Configs:
Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
1026,1028,1025Isr: 1026,1028,1025
{code}

reassignment  (1026,1028,1025) => (1027,1025,1028)

{code}
Topic:georgeli_test PartitionCount:8ReplicationFactor:4 
Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027
Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
1027,1025,1028,1026   Isr: 1026,1028,1025
{code}

Notice the above:   Leader remains 1026.   but Replicas: 1027,1025,1028,1026.   
If we run preferred leader election,  it will try 1027 first, then 1025.
After  1027 is in ISR,  then the final assignment will be  (1027,1025,1028).

My proposal for a minor improvement is to keep the original ordering replicas 
during the reassignment (could be long for big topic/partitions).  and after 
all replicas in ISR, then finally set the partition assignment to New 
reassignment.  

{code}
  val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ 
controllerContext.partitionReplicaAssignment(topicPartition)).toSet
  //1. Update AR in ZK with OAR + RAR.
  updateAssignedReplicasForPartition(topicPartition, 
newAndOldReplicas.toSeq)
{code} 

above code changed to below to keep the original ordering during reassignment: 

{code}
  val newAndOldReplicas = 
(controllerContext.partitionReplicaAssignment(topicPartition) ++ 
reassignedPartitionContext.newReplicas).toSet
{ code}


> partition assignment would be better original_assignment + new_reassignment 
> during reassignments
> 
>
> Key: KAFKA-8663
> URL: https://issues.apache.org/jira/browse/KAFKA-8663
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 1.1.1, 2.3.0
>Reporter: GEORGE LI
>Priority: Minor
>
> From my 

[jira] [Created] (KAFKA-8663) partition assignment would be better original_assignment + new_reassignment during reassignments

2019-07-13 Thread GEORGE LI (JIRA)
GEORGE LI created KAFKA-8663:


 Summary: partition assignment would be better original_assignment 
+ new_reassignment during reassignments
 Key: KAFKA-8663
 URL: https://issues.apache.org/jira/browse/KAFKA-8663
 Project: Kafka
  Issue Type: Improvement
  Components: controller, core
Affects Versions: 2.3.0, 1.1.1
Reporter: GEORGE LI


>From my observation/experience during reassignment,  the partition assignment 
>replica ordering gets changed.   because it's  OAR + RAR  (original replicas + 
>reassignment replicas)  set union. 

However, it seems like the preferred leaders changed during the reassignments.  
Normally if there is no cluster preferred leader election,  the leader is still 
the old leader.  But if during the reassignments, there is a leader election,  
the leadership changes.  This caused some side effects.  Let's look at this 
example.

{code}
Topic:georgeli_test PartitionCount:8ReplicationFactor:3 Configs:
Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
1026,1028,1025Isr: 1026,1028,1025
{code}

reassignment  (1026,1028,1025) => (1027,1025,1028)

{code}
Topic:georgeli_test PartitionCount:8ReplicationFactor:4 
Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027
Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
1027,1025,1028,1026   Isr: 1026,1028,1025
{code}

Notice the above:   Leader remains 1026.   but Replicas: 1027,1025,1028,1026.   
If we run preferred leader election,  it will try 1027 first, then 1025.
After  1027 is in ISR,  then the final assignment will be  (1027,1025,1028).

My proposal for a minor improvement is to keep the original ordering replicas 
during the reassignment (could be long for big topic/partitions).  and after 
all replicas in ISR, then finally set the partition assignment to New 
reassignment.  

{code}
  val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ 
controllerContext.partitionReplicaAssignment(topicPartition)).toSet
  //1. Update AR in ZK with OAR + RAR.
  updateAssignedReplicasForPartition(topicPartition, 
newAndOldReplicas.toSeq)
{code} 

above code changed to below to keep the original ordering during reassignment: 

{code}
  val newAndOldReplicas = 
(controllerContext.partitionReplicaAssignment(topicPartition) ++ 
reassignedPartitionContext.newReplicas).toSet
{ code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)

2019-07-09 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881480#comment-16881480
 ] 

GEORGE LI commented on KAFKA-8638:
--

Hi Jose,

Because a broker can have hundreds/thousands topic/partitions assigned to it.   
To do reassignments to move it to the end and lower the priority, then remember 
to the original ordering to restore later is much more tedious than simply put 
it in "deprioritized list" for some time, then remove it when certain 
conditions are improved/met.  

We have a Rebalance Tool which rebalance the whole cluster,  it's better not 
keep changing the assignments replicas ordering constantly. With the  
"deprioritized list" , it's cleaner. 

Let's just take the use case of taking controller out of being leaders/serving 
traffic, and just as followers.   We observed that broker not serving any 
leaders will have less CPU utilization.  For clusters with busy controller 
doing extra work than other brokers, we would like it to not taking any 
leaders.   Right now,  for a broker to lose leadership, we need to bounce the 
broker.  In this case, if bounce, the controller fails over to another broker.  
 If we change the ordering of the current assignments for the controller,  next 
time, the controller fails over, we need to do the same.   

For managing  "deprioritized list",  the user (e.g. the on-call engineer seeing 
issue with a broker that should not serve leadership traffic) should have the 
ability to add/remove it.   My initial thought on how to store this 
"deprioritized list"  is 2 approaches below: 

* Design #1:
Introduce a Preferred Leader Blacklist. e.g. ZK path/node: 
/preferred_leader_blacklist/ 

Direct manipulation of ZK should be avoided as Kafka is moving toward RPC 
based.  A new Request/Response RPC call is needed.  

No ZK Watcher of this ZK node children is needed to trigger leadership changes 
for the current design.  

* Design #2:
Introduce a preferred_leader_blacklist dynamic config which by default is 
empty.  It allows a list of broker IDs separated by commas.  E.g. below broker 
ID  1,  10, 65 are being put into the blacklist. 


{code}
/usr/lib/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 
--entity-type brokers --entity-default --alter --add-config 
preferred_leader_blacklist=1,10,65
{code}

Since the Kafka dynamic config is already using --bootstrap-server,  it does 
not need to manipulate the Zookeeper directly.  The downside of this: when 
adding/removing one broker from the list, instead of doing with one ZK node per 
broker in Design#1 above, the dynamic config needs to be updated with a new 
complete list. E.g. in order to remove broker 10 from the blacklist,  update 
preferred_leader_blacklist=1,65

The dynamic config should not trigger any leadership changes for the current 
design. 




> Preferred Leader Blacklist (deprioritized list)
> ---
>
> Key: KAFKA-8638
> URL: https://issues.apache.org/jira/browse/KAFKA-8638
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, controller, core
>Affects Versions: 1.1.1, 2.3.0, 2.2.1
>Reporter: GEORGE LI
>Assignee: GEORGE LI
>Priority: Major
>
> Currently, the kafka preferred leader election will pick the broker_id in the 
> topic/partition replica assignments in a priority order when the broker is in 
> ISR. The preferred leader is the broker id in the first position of replica. 
> There are use-cases that, even the first broker in the replica assignment is 
> in ISR, there is a need for it to be moved to the end of ordering (lowest 
> priority) when deciding leadership during  preferred leader election. 
> Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
> leader.  When preferred leadership is run, it will pick 1 as the leader if 
> it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, 
> then pick 3 as the leader. There are use cases that, even 1 is in ISR, we 
> would like it to be moved to the end of ordering (lowest priority) when 
> deciding leadership during preferred leader election.   Below is a list of 
> use cases:
> * (If broker_id 1 is a swapped failed host and brought up with last segments 
> or latest offset without historical data (There is another effort on this), 
> it's better for it to not serve leadership till it's caught-up.
> * The cross-data center cluster has AWS instances which have less computing 
> power than the on-prem bare metal machines.  We could put the AWS broker_ids 
> in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, 
> without changing the reassignments ordering of the replicas. 
> * If the broker_id 1 is constantly losing leadership after some time: 
> "Flapping". we would want to exclude 1 to be a leader unless all other 

[jira] [Updated] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)

2019-07-08 Thread GEORGE LI (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GEORGE LI updated KAFKA-8638:
-
Description: 
Currently, the kafka preferred leader election will pick the broker_id in the 
topic/partition replica assignments in a priority order when the broker is in 
ISR. The preferred leader is the broker id in the first position of replica. 
There are use-cases that, even the first broker in the replica assignment is in 
ISR, there is a need for it to be moved to the end of ordering (lowest 
priority) when deciding leadership during  preferred leader election. 

Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
leader.  When preferred leadership is run, it will pick 1 as the leader if it's 
ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 
3 as the leader. There are use cases that, even 1 is in ISR, we would like it 
to be moved to the end of ordering (lowest priority) when deciding leadership 
during preferred leader election.   Below is a list of use cases:

* (If broker_id 1 is a swapped failed host and brought up with last segments or 
latest offset without historical data (There is another effort on this), it's 
better for it to not serve leadership till it's caught-up.

* The cross-data center cluster has AWS instances which have less computing 
power than the on-prem bare metal machines.  We could put the AWS broker_ids in 
Preferred Leader Blacklist, so on-prem brokers can be elected leaders, without 
changing the reassignments ordering of the replicas. 

* If the broker_id 1 is constantly losing leadership after some time: 
"Flapping". we would want to exclude 1 to be a leader unless all other brokers 
of this topic/partition are offline.  The “Flapping” effect was seen in the 
past when 2 or more brokers were bad, when they lost leadership 
constantly/quickly, the sets of partition replicas they belong to will see 
leadership constantly changing.  The ultimate solution is to swap these bad 
hosts.  But for quick mitigation, we can also put the bad hosts in the 
Preferred Leader Blacklist to move the priority of its being elected as leaders 
to the lowest. 

*  If the controller is busy serving an extra load of metadata requests and 
other tasks. we would like to put the controller's leaders to other brokers to 
lower its CPU load. currently bouncing to lose leadership would not work for 
Controller, because after the bounce, the controller fails over to another 
broker.

* Avoid bouncing broker in order to lose its leadership: it would be good if we 
have a way to specify which broker should be excluded from serving 
traffic/leadership (without changing the replica assignment ordering by 
reassignments, even though that's quick), and run preferred leader election.  A 
bouncing broker will cause temporary URP, and sometimes other issues.  Also a 
bouncing of broker (e.g. broker_id 1) can temporarily lose all its leadership, 
but if another broker (e.g. broker_id 2) fails or gets bounced, some of its 
leaderships will likely failover to broker_id 1 on a replica with 3 brokers.  
If broker_id 1 is in the blacklist, then in such a scenario even broker_id 2 
offline,  the 3rd broker can take leadership. 


The current work-around of the above is to change the topic/partition's replica 
reassignments to move the broker_id 1 from the first position to the last 
position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). This 
changes the replica reassignments, and we need to keep track of the original 
one and restore if things change (e.g. controller fails over to another broker, 
the swapped empty broker caught up). That’s a rather tedious task.
 

  was:
Currently, the kafka preferred leader election will pick the broker_id in the 
topic/partition replica assignments in a priority order when the broker is in 
ISR. The preferred leader is the broker id in the first position of replica. 
There are use-cases that, even the first broker in the replica assignment is in 
ISR, there is a need for it to be moved to the end of ordering (lowest 
priority) when deciding leadership during  preferred leader election. 

Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
leader.  When preferred leadership is run, it will pick 1 as the leader if it's 
ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 
3 as the leader. There are use cases that, even 1 is in ISR, we would like it 
to be moved to the end of ordering (lowest priority) when deciding leadership 
during preferred leader election.   Below is a list of use cases:

# If broker_id 1 is a swapped failed host and brought up with last segments or 
latest offset without historical data (There is another effort on this), it's 
better for it to not serve leadership till it's caught-up.

# The cross-data center cluster has AWS instances which 

[jira] [Created] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)

2019-07-08 Thread GEORGE LI (JIRA)
GEORGE LI created KAFKA-8638:


 Summary: Preferred Leader Blacklist (deprioritized list)
 Key: KAFKA-8638
 URL: https://issues.apache.org/jira/browse/KAFKA-8638
 Project: Kafka
  Issue Type: Improvement
  Components: config, controller, core
Affects Versions: 2.2.1, 2.3.0, 1.1.1
Reporter: GEORGE LI
Assignee: GEORGE LI


Currently, the kafka preferred leader election will pick the broker_id in the 
topic/partition replica assignments in a priority order when the broker is in 
ISR. The preferred leader is the broker id in the first position of replica. 
There are use-cases that, even the first broker in the replica assignment is in 
ISR, there is a need for it to be moved to the end of ordering (lowest 
priority) when deciding leadership during  preferred leader election. 

Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
leader.  When preferred leadership is run, it will pick 1 as the leader if it's 
ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 
3 as the leader. There are use cases that, even 1 is in ISR, we would like it 
to be moved to the end of ordering (lowest priority) when deciding leadership 
during preferred leader election.   Below is a list of use cases:

# If broker_id 1 is a swapped failed host and brought up with last segments or 
latest offset without historical data (There is another effort on this), it's 
better for it to not serve leadership till it's caught-up.

# The cross-data center cluster has AWS instances which have less computing 
power than the on-prem bare metal machines.  We could put the AWS broker_ids in 
Preferred Leader Blacklist, so on-prem brokers can be elected leaders, without 
changing the reassignments ordering of the replicas. 

# If the broker_id 1 is constantly losing leadership after some time: 
"Flapping". we would want to exclude 1 to be a leader unless all other brokers 
of this topic/partition are offline.  The “Flapping” effect was seen in the 
past when 2 or more brokers were bad, when they lost leadership 
constantly/quickly, the sets of partition replicas they belong to will see 
leadership constantly changing.  The ultimate solution is to swap these bad 
hosts.  But for quick mitigation, we can also put the bad hosts in the 
Preferred Leader Blacklist to move the priority of its being elected as leaders 
to the lowest. 

#  If the controller is busy serving an extra load of metadata requests and 
other tasks. we would like to put the controller's leaders to other brokers to 
lower its CPU load. currently bouncing to lose leadership would not work for 
Controller, because after the bounce, the controller fails over to another 
broker.

# Avoid bouncing broker in order to lose its leadership: it would be good if we 
have a way to specify which broker should be excluded from serving 
traffic/leadership (without changing the replica assignment ordering by 
reassignments, even though that's quick), and run preferred leader election.  A 
bouncing broker will cause temporary URP, and sometimes other issues.  Also a 
bouncing of broker (e.g. broker_id 1) can temporarily lose all its leadership, 
but if another broker (e.g. broker_id 2) fails or gets bounced, some of its 
leaderships will likely failover to broker_id 1 on a replica with 3 brokers.  
If broker_id 1 is in the blacklist, then in such a scenario even broker_id 2 
offline,  the 3rd broker can take leadership. 


The current work-around of the above is to change the topic/partition's replica 
reassignments to move the broker_id 1 from the first position to the last 
position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). This 
changes the replica reassignments, and we need to keep track of the original 
one and restore if things change (e.g. controller fails over to another broker, 
the swapped empty broker caught up). That’s a rather tedious task.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8527) add dynamic maintenance broker config

2019-06-20 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868914#comment-16868914
 ] 

GEORGE LI commented on KAFKA-8527:
--

We had this implemented in a separate admin service outside of kafka.  One of 
its functions is for topic related changes.  e.g.  Topic Creation,  Topic 
partition expansion.  There is a ZK path/node which indicate which broker_id is 
under "maintenance" and thus when  creating topic/expanding topic partition, it 
will exclude this live broker_id from getting any new replica assignments. 

> add dynamic maintenance broker config
> -
>
> Key: KAFKA-8527
> URL: https://issues.apache.org/jira/browse/KAFKA-8527
> Project: Kafka
>  Issue Type: Improvement
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> Before we remove a broker for maintenance, we want to remove all partitions 
> out of the broker first to avoid introducing new Under Replicated Partitions 
> (URPs) . That is because shutting down (or killing) a broker that still hosts 
> live partitions will lead to temporarily reduced replicas of those 
> partitions. Moving partitions out of a broker can be done via partition 
> reassignment.  However, during the partition reassignment process, new topics 
> can be created by Kafka and thereby new partitions can be added to the broker 
> that is pending for removal. As a result, the removal process will need to 
> recursively moving new topic partitions out of the maintenance broker. In a 
> production environment in which topic creation is frequent and URP causing by 
> broker removal cannot be tolerated, the removal process can take multiple 
> iterations to complete the partition reassignment.  We want to provide a 
> mechanism to mask a broker as maintenance broker (Via Cluster Level Dynamic 
> configuration). One action Kafka can take for the maintenance broker is not 
> to assign new topic partitions to it, and thereby facilitate the broker 
> removal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment

2019-02-25 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777257#comment-16777257
 ] 

GEORGE LI commented on KAFKA-6794:
--

Hi [~viktorsomogyi],  

I think this "Incremental Reassignment"  is different from KIP-236  "Planned 
Future Change" section.  That one is basically trying to overcome the current 
limitation that only one batch of reassignments can be run in 
/admin/reassign_partitions. 

e.g.  50 reassignments in a batch submitted,   49 completed.  and there is one 
long running reassignment pending in /admin/reassign_partitions,  Currently,  
not able to submit new batch until all in  /admin/reassign_partitions are 
completed and the node is removed from ZK.   If the cluster is pretty much 
idle,  this pretty much waste the resource for not able to submit new 
reassignments. 

The proposal is to enable submit new batch to a queue (ZK node),  and merge the 
new assignments to /admin/reassign_partitions.   This will try to use the  
Cancel Reassignments if there is conflict (same topic/partition) in both the 
new queue and the current /admin/reassign_partitions .

 

> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6794) Support for incremental replica reassignment

2019-02-25 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777149#comment-16777149
 ] 

GEORGE LI edited comment on KAFKA-6794 at 2/25/19 7:56 PM:
---

Hi, [~viktorsomogyi], 

When I rebalance the whole cluster,  I generate the reassignment plan json with 
 a list of  topic/partitions with its  new_replicas/original_replicas,  and 
sort them by their size, so try to group them in batches of similar sizes for 
execution,  so that they are expected to complete reassignment using about  the 
same amount of time.  

Say there are  1000 reassignments, and 50 per batch.  That will be at least 20 
batches/buckets to put in for execution.   there could be > 20 batches because 
a reassignment like (1,2,3,4) => (5,6,7,8) can be split into 4 reassignments in 
4 batches.   A batch will be submitted, and an execution program will keep 
checking the existence of /admin/reassign_partitions before submitting the next 
batch. 

Comparing the new_replicas Vs. original_replicas  set,  the algorithm can 
detect if there is more than 1 new replica in the new_replicas, if yes, then 
break it and put in different batch/bucket.There are other considerations 
of  the reassignments in the same batch:  e.g.  for different topic/partition,  
try to spread the load and not to overwhelm a Leader.  e.g.  the Leadership 
bytes within the same batch for reassignments should be balanced across all 
brokers/leaders in the cluster as much as possible.  Same for new follower 
(spread across the cluster not to overwhelm a particular follower).  

I think this (optimal executions of reassignment plans in batches) can be 
achieved  outside of Kafka.  




was (Author: sql_consulting):
Hi, [~viktorsomogyi], 

When I rebalance the whole cluster,  I generate the reassignment plan json with 
 a list of  topic/partitions with its  new_replicas/original_replicas,  and 
sort them by their size, so try to group them in batches of similar sizes for 
execution,  so that they are expected to complete reassignment using about  the 
same amount of time.  

Say there are  1000 reassignments, and 50 per batch.  That will be at least 20 
batches/buckets to put in for execution.  Comparing the new_replicas Vs. 
original_replicas  set,  the algorithm can detect if there is more than 1 new 
replica in the new_replicas, if yes, then break it and put in different 
batch/bucket.There are other considerations of  the reassignments in the 
same batch:  e.g.  for different topic/partition,  try to spread the load and 
not to overwhelm a Leader.  e.g.  the Leadership bytes within the same batch 
for reassignments should be balanced across all brokers/leaders in the cluster 
as much as possible.  I think this (optimal executions of reassignment plans in 
batches) can only be achieved  outside of Kafka.  



> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment

2019-02-25 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777149#comment-16777149
 ] 

GEORGE LI commented on KAFKA-6794:
--

Hi, [~viktorsomogyi], 

When I rebalance the whole cluster,  I generate the reassignment plan json with 
 a list of  topic/partitions with its  new_replicas/original_replicas,  and 
sort them by their size, so try to group them in batches of similar sizes for 
execution,  so that they are expected to complete reassignment using about  the 
same amount of time.  

Say there are  1000 reassignments, and 50 per batch.  That will be at least 20 
batches/buckets to put in for execution.  Comparing the new_replicas Vs. 
original_replicas  set,  the algorithm can detect if there is more than 1 new 
replica in the new_replicas, if yes, then break it and put in different 
batch/bucket.There are other considerations of  the reassignments in the 
same batch:  e.g.  for different topic/partition,  try to spread the load and 
not to overwhelm a Leader.  e.g.  the Leadership bytes within the same batch 
for reassignments should be balanced across all brokers/leaders in the cluster 
as much as possible.  I think this (optimal executions of reassignment plans in 
batches) can only be achieved  outside of Kafka.  



> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6794) Support for incremental replica reassignment

2019-02-22 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775598#comment-16775598
 ] 

GEORGE LI edited comment on KAFKA-6794 at 2/22/19 9:21 PM:
---

I also have seen this issue.  When more than one broker is in the New Replicas 
of the reassignments,  the topic is big,  even with throttle,  the leader is 
working hard to sync to all the extra followers and could cause latency jump. 

 

{{One of the solutions is to execute the reassignment plans in an "Optimal" 
way.  Submit the reassignment plans in batches.   making sure each batch, the 
topic/partition will have only one extra New broker in the New Replicas,  wait 
till that reassignment completes, then resubmit another one.  e.g.  if the 
reassignment is (1,2,3,4) =>  (5,6,7,8),    Split it in 4 batches (buckets), 
every batch only 1 new replica.  }}

 

{{Batch 1:  (1,2,3,5)}}

{{Batch 2:  (1,2,5,6)}}

{{Batch 3:  (1,5,6,7)}}

{{Batch 4:  (5,6,7,8)}}

 

Between each batch,  check ZK node /admin/reassign_partitions exists,  if yes, 
sleep and check again,   if not, submit next batch. 

 

 


was (Author: sql_consulting):
I also have seen this issue.  When more than one broker is in the New Replicas 
of the reassignments,  the topic is big,  even with throttle,  the leader is 
working hard to sync to all the extra followers and could cause latency jump. 

 

{{One of solutions is execute the reassignment plans in an "Optimal" way.  
Submit the reassignment plans in batches.   making sure each batch, the 
topic/partition will have only one extra New broker in the New Replicas,  wait 
till that reassignment completes, then resubmit another one.  e.g.  for if the 
reassignment is (1,2,3,4) =>  (5,6,7,8).   Split it in 4 batches (buckets), 
every batch only 1 new replica.  }}

 

{{Batch 1:  (1,2,3,5)}}

{{Batch 2:  (1,2,5,6)}}

{{Batch 3:  (1,5,6,7)}}

{{Batch 4:  (5,6,7,8)}}

 

Between each batch,  check ZK node /admin/reassign_partitions exists,  if yes, 
sleep and check again,   if not, submit next batch. 

 

 

> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment

2019-02-22 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775598#comment-16775598
 ] 

GEORGE LI commented on KAFKA-6794:
--

I also have seen this issue.  When more than one broker is in the New Replicas 
of the reassignments,  the topic is big,  even with throttle,  the leader is 
working hard to sync to all the extra followers and could cause latency jump. 

 

{{One of solutions is execute the reassignment plans in an "Optimal" way.  
Submit the reassignment plans in batches.   making sure each batch, the 
topic/partition will have only one extra New broker in the New Replicas,  wait 
till that reassignment completes, then resubmit another one.  e.g.  for if the 
reassignment is (1,2,3,4) =>  (5,6,7,8).   Split it in 4 batches (buckets), 
every batch only 1 new replica.  }}

 

{{Batch 1:  (1,2,3,5)}}

{{Batch 2:  (1,2,5,6)}}

{{Batch 3:  (1,5,6,7)}}

{{Batch 4:  (5,6,7,8)}}

 

Between each batch,  check ZK node /admin/reassign_partitions exists,  if yes, 
sleep and check again,   if not, submit next batch. 

 

 

> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7854) Behavior change in controller picking up partition reassignment tasks since 1.1.0

2019-02-20 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16773395#comment-16773395
 ] 

GEORGE LI commented on KAFKA-7854:
--

[KIP-236|https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment]
  is addressing the cancel/rollback of pending reassignments.    

WIP:  KAFKA-6359

Github Pull Request#:  [6296|https://github.com/apache/kafka/pull/6296]

> Behavior change in controller picking up partition reassignment tasks since 
> 1.1.0
> -
>
> Key: KAFKA-7854
> URL: https://issues.apache.org/jira/browse/KAFKA-7854
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: Zhanxiang (Patrick) Huang
>Priority: Major
>
> After [https://github.com/apache/kafka/pull/4143,] the controller does not 
> subscribe to data change on /admin/reassign_partitions any more (in order to 
> avoid unnecessarily loading the reassignment data again after controller 
> updating the znode) as opposed to the previous kafka versions. However, there 
> are systems built around kafka relying on the previous behavior to 
> incrementally update the list of partition reassignment since kafka does not 
> natively support that.
>  
> For example, [cruise control|https://github.com/linkedin/cruise-control] can 
> rely on the previous behavior (controller listening to data changes) to 
> maintain the reassignment concurrency by dynamically updating the data in the 
> reassignment znode instead of waiting for the current batch to finish and 
> doing reassignment batch by batch, which can significantly reduce the 
> rebalance time in production clusters. Although directly updating the znode 
> can somehow be viewed as an anti-pattern in the long term, this is necessary 
> since kafka does not natively support incrementally submit more reassignment 
> tasks. However, after our kafka clusters migrate from 0.11 to 2.0, cruise 
> control no longer works because the controller behavior has changed. This 
> reveals the following problems:
>  * These behavior changes may be viewed as internal changes so compatibility 
> is not guaranteed but I think by convention people do view this as public 
> interfaces and rely on the compatibility. In this case, I think we should 
> clearly document the data contract for the partition reassignment task to 
> avoid misusage and making controller changes that break the defined data 
> contract. There may be other cases (e.g. topic deletion) whose data contracts 
> need to be clearly defined and we should keep it in mind when making 
> controller changes.
>  * Kafka does not natively support incrementally submit more reassignment 
> tasks. If we do want to support that nicely, we should consider change how we 
> store the reassignment data to store the data in child nodes and let the 
> controller listen on child node changes, similar to what we do for 
> /admin/delete_topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6304) The controller should allow updating the partition reassignment for the partitions being reassigned

2019-02-20 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16773394#comment-16773394
 ] 

GEORGE LI commented on KAFKA-6304:
--

[KIP-236|https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment]
  is addressing the cancel/rollback of pending reassignments.    

WIP:  KAFKA-6359

Github Pull Request#:  [6296|https://github.com/apache/kafka/pull/6296]

 

 

 

 

 

 

> The controller should allow updating the partition reassignment for the 
> partitions being reassigned
> ---
>
> Key: KAFKA-6304
> URL: https://issues.apache.org/jira/browse/KAFKA-6304
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Priority: Major
>
> Currently the controller will not process the partition reassignment of a 
> partition if the partition is already being reassigned.
> The issue is that if there is a broker failure during the partition 
> reassignment, the partition reassignment may never finish. And the users may 
> want to cancel the partition reassignment. However, the controller will 
> refuse to do that unless user manually deletes the partition reassignment zk 
> path, force a controller switch and then issue the revert command. This is 
> pretty involved. It seems reasonable for the controller to replace the 
> ongoing stuck reassignment and replace it with the updated partition 
> assignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6359) Work for KIP-236

2019-02-20 Thread GEORGE LI (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16773351#comment-16773351
 ] 

GEORGE LI commented on KAFKA-6359:
--

New Pull Request:   https://github.com/apache/kafka/pull/6296

> Work for KIP-236
> 
>
> Key: KAFKA-6359
> URL: https://issues.apache.org/jira/browse/KAFKA-6359
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: GEORGE LI
>Priority: Minor
>
> This issue is for the work described in KIP-236.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)