[jira] [Updated] (KAFKA-3493) Replica fetcher load is not balanced over fetcher threads

2016-10-24 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3493:
---
Fix Version/s: (was: 0.10.0.2)
   0.10.2.0

> Replica fetcher load is not balanced over fetcher threads
> -
>
> Key: KAFKA-3493
> URL: https://issues.apache.org/jira/browse/KAFKA-3493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
> Fix For: 0.10.2.0
>
>
> The replicas are not evenly distributed among the fetcher threads. This has 
> caused some fetcher threads get overloaded and hence their requests time out 
> frequently. This is especially a big issue when a new node is added to the 
> cluster and the fetch traffic is high. 
> Here is an example run in a test cluster with 10 brokers and 6 fetcher 
> threads (per source broker). A single topic consisting of 500+ partitions was 
> assigned to have a replica for each parition on the newly added broker.
> {code}[kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; 
> do grep ReplicaFetcherThread-$i- /var/log/kafka/server.log | grep "reset its 
> fetch offset from 0" | wc -l; done
> 85
> 83
> 85
> 83
> 85
> 85
> [kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; do 
> grep ReplicaFetcherThread-$i-22 /var/log/kafka/server.log | grep "reset its 
> fetch offset from 0" | wc -l; done
> 15
> 1
> 13
> 1
> 14
> 1
> {code}
> The problem is that AbstractFetcherManager::getFetcherId method does not take 
> the broker id into account:
> {code}
>   private def getFetcherId(topic: String, partitionId: Int) : Int = {
> Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
>   }
> {code}
> Hence although the replicas are evenly distributed among the fetcher ids 
> across all source brokers, this is not necessarily the case for each broker 
> separately. 
> I think a random function would do a much better job in distributing the load 
> over the fetcher threads from each source broker.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3493) Replica fetcher load is not balanced over fetcher threads

2016-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3493:
---
Fix Version/s: (was: 0.10.0.1)
   0.10.0.2

> Replica fetcher load is not balanced over fetcher threads
> -
>
> Key: KAFKA-3493
> URL: https://issues.apache.org/jira/browse/KAFKA-3493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
> Fix For: 0.10.0.2
>
>
> The replicas are not evenly distributed among the fetcher threads. This has 
> caused some fetcher threads get overloaded and hence their requests time out 
> frequently. This is especially a big issue when a new node is added to the 
> cluster and the fetch traffic is high. 
> Here is an example run in a test cluster with 10 brokers and 6 fetcher 
> threads (per source broker). A single topic consisting of 500+ partitions was 
> assigned to have a replica for each parition on the newly added broker.
> {code}[kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; 
> do grep ReplicaFetcherThread-$i- /var/log/kafka/server.log | grep "reset its 
> fetch offset from 0" | wc -l; done
> 85
> 83
> 85
> 83
> 85
> 85
> [kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; do 
> grep ReplicaFetcherThread-$i-22 /var/log/kafka/server.log | grep "reset its 
> fetch offset from 0" | wc -l; done
> 15
> 1
> 13
> 1
> 14
> 1
> {code}
> The problem is that AbstractFetcherManager::getFetcherId method does not take 
> the broker id into account:
> {code}
>   private def getFetcherId(topic: String, partitionId: Int) : Int = {
> Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
>   }
> {code}
> Hence although the replicas are evenly distributed among the fetcher ids 
> across all source brokers, this is not necessarily the case for each broker 
> separately. 
> I think a random function would do a much better job in distributing the load 
> over the fetcher threads from each source broker.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3493) Replica fetcher load is not balanced over fetcher threads

2016-04-01 Thread Maysam Yabandeh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maysam Yabandeh updated KAFKA-3493:
---
Description: 
The replicas are not evenly distributed among the fetcher threads. This has 
caused some fetcher threads get overloaded and hence their requests time out 
frequently. This is especially a big issue when a new node is added to the 
cluster and the fetch traffic is high. 

Here is an example run in a test cluster with 10 brokers and 6 fetcher threads 
(per source broker). A single topic consisting of 500+ partitions was assigned 
to have a replica for each parition on the newly added broker.

{code}[kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; 
do grep ReplicaFetcherThread-$i- /var/log/kafka/server.log | grep "reset its 
fetch offset from 0" | wc -l; done
85
83
85
83
85
85
[kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; do grep 
ReplicaFetcherThread-$i-22 /var/log/kafka/server.log | grep "reset its fetch 
offset from 0" | wc -l; done
15
1
13
1
14
1
{code}

The problem is that AbstractFetcherManager::getFetcherId method does not take 
the broker id into account:
{code}
  private def getFetcherId(topic: String, partitionId: Int) : Int = {
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
  }
{code}
Hence although the replicas are evenly distributed among the fetcher ids across 
all source brokers, this is not necessarily the case for each broker 
separately. 

I think a random function would do a much better job in distributing the load 
over the fetcher threads from each source broker.

Thoughts?

  was:
The replicas are not evenly distributed among the fetcher threads. This has 
caused some fetcher threads get overloaded and hence their requests timeout 
frequently. This is especially a big issue when a new node is added to the 
cluster and the fetch traffic is large. 

Here is an example run in a test cluster with 10 brokers and 6 fetcher threads 
(per source broker). A single topic consisting of 500+ partitions was assigned 
to have a replica for each parition on the newly added broker.

{code}[kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; 
do grep ReplicaFetcherThread-$i- /var/log/kafka/server.log | grep "reset its 
fetch offset from 0" | wc -l; done
85
83
85
83
85
85
[kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; do grep 
ReplicaFetcherThread-$i-22 /var/log/kafka/server.log | grep "reset its fetch 
offset from 0" | wc -l; done
15
1
13
1
14
1
{code}

The problem is that AbstractFetcherManager::getFetcherId method does not take 
the broker id into account:
{code}
  private def getFetcherId(topic: String, partitionId: Int) : Int = {
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
  }
{code}
Hence although the replicas are evenly distributed among the fetcher ids across 
all source brokers, this is not necessarily the case for each broker 
separately. 

I think a random function would do a much better job in distributing the load 
over the fetcher threads from each source broker.

Thoughts?


> Replica fetcher load is not balanced over fetcher threads
> -
>
> Key: KAFKA-3493
> URL: https://issues.apache.org/jira/browse/KAFKA-3493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
> Fix For: 0.10.0.1
>
>
> The replicas are not evenly distributed among the fetcher threads. This has 
> caused some fetcher threads get overloaded and hence their requests time out 
> frequently. This is especially a big issue when a new node is added to the 
> cluster and the fetch traffic is high. 
> Here is an example run in a test cluster with 10 brokers and 6 fetcher 
> threads (per source broker). A single topic consisting of 500+ partitions was 
> assigned to have a replica for each parition on the newly added broker.
> {code}[kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; 
> do grep ReplicaFetcherThread-$i- /var/log/kafka/server.log | grep "reset its 
> fetch offset from 0" | wc -l; done
> 85
> 83
> 85
> 83
> 85
> 85
> [kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; do 
> grep ReplicaFetcherThread-$i-22 /var/log/kafka/server.log | grep "reset its 
> fetch offset from 0" | wc -l; done
> 15
> 1
> 13
> 1
> 14
> 1
> {code}
> The problem is that AbstractFetcherManager::getFetcherId method does not take 
> the broker id into account:
> {code}
>   private def getFetcherId(topic: String, partitionId: Int) : Int = {
> Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
>   }
> {code}
> Hence although the replicas are evenly distributed among the fetcher ids 
> across all source brokers, this is not necessarily the case for each broker 
> separately.