[jira] [Updated] (KAFKA-3428) Remove metadata sync bottleneck from mirrormaker's producer

2016-09-27 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3428:
---
Fix Version/s: (was: 0.10.1.0)
   0.10.2.0

> Remove metadata sync bottleneck from mirrormaker's producer
> ---
>
> Key: KAFKA-3428
> URL: https://issues.apache.org/jira/browse/KAFKA-3428
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
> Fix For: 0.10.2.0
>
>
> Due to sync on the single producer, MM in a setup with 32 consumer threads 
> could not send more than 
> 358k msg/sec hence not being able to saturate the NIC. Profiling showed the 
> producer.send takes 0.080 ms in average, which explains the bottleneck of 
> 358k msg/sec. The following explains the bottleneck in producer.send and 
> suggests how to improve it.
> Current impl of MM relies on a single reducer. For EACH message, the 
> producer.send() calls waitOnMetadata which runs the following synchronized 
> method
> {code}
> // add topic to metadata topic list if it is not there already.
> if (!this.metadata.containsTopic(topic))
> this.metadata.add(topic);
> {code}
> Although the code is mostly noop, since containsTopic is synchronized it 
> becomes the bottleneck in MM.
> Profiling highlights this bottleneck:
> {code}
> 100.0% - 65,539 ms kafka.tools.MirrorMaker$MirrorMakerThread.run
>   18.9% - 12,403 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   13.8% - 9,056 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   12.1% - 7,933 ms org.apache.kafka.clients.Metadata.containsTopic
>   1.7% - 1,088 ms org.apache.kafka.clients.Metadata.fetch
>   2.6% - 1,729 ms org.apache.kafka.clients.Metadata.fetch
>   2.2% - 1,442 ms 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> {code}
> After replacing this bottleneck with a kind of noop, another run of the 
> profiler shows that fetch is the next bottleneck:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy   132 s (54 %)   n/a n/a
>   java.lang.Thread.run 50,776 ms (21 %)   n/a n/a
>   org.apache.kafka.clients.Metadata.fetch  20,881 ms (8 %)n/a 
> n/a
>   6.8% - 16,546 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   6.8% - 16,546 ms kafka.tools.MirrorMaker$MirrorMakerProducer.send
> {code}
> however the fetch method does not need to be syncronized
> {code}
> public synchronized Cluster fetch() {
> return this.cluster;
> }
> {code}
> removing sync from the fetch method shows that bottleneck is disappeared:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy   249 s (78 %)   n/a n/a
>   org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel  
>  24,489 ms (7 %)n/a n/a
>   org.xerial.snappy.SnappyNative.rawUncompress 17,024 ms (5 %)
> n/a n/a
>   org.apache.kafka.clients.producer.internals.RecordAccumulator.append
>  13,817 ms (4 %)n/a n/a
>   4.3% - 13,817 ms org.apache.kafka.clients.producer.KafkaProducer.send
> {code}
> Internally we have applied a patch to remove this bottleneck. The patch does 
> the following:
> 1. replace HashSet with a concurrent hash set
> 2. remove sync from containsTopic and fetch
> 3. pass a replica of topics to getClusterForCurrentTopics since this 
> synchronized method access topics at two locations and topics being hanged in 
> the middle might mess with the semantics.
> Any interest in applying this patch? Any alternative suggestions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3428) Remove metadata sync bottleneck from mirrormaker's producer

2016-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3428:
---
Fix Version/s: (was: 0.10.0.1)
   0.10.1.0

> Remove metadata sync bottleneck from mirrormaker's producer
> ---
>
> Key: KAFKA-3428
> URL: https://issues.apache.org/jira/browse/KAFKA-3428
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
> Fix For: 0.10.1.0
>
>
> Due to sync on the single producer, MM in a setup with 32 consumer threads 
> could not send more than 
> 358k msg/sec hence not being able to saturate the NIC. Profiling showed the 
> producer.send takes 0.080 ms in average, which explains the bottleneck of 
> 358k msg/sec. The following explains the bottleneck in producer.send and 
> suggests how to improve it.
> Current impl of MM relies on a single reducer. For EACH message, the 
> producer.send() calls waitOnMetadata which runs the following synchronized 
> method
> {code}
> // add topic to metadata topic list if it is not there already.
> if (!this.metadata.containsTopic(topic))
> this.metadata.add(topic);
> {code}
> Although the code is mostly noop, since containsTopic is synchronized it 
> becomes the bottleneck in MM.
> Profiling highlights this bottleneck:
> {code}
> 100.0% - 65,539 ms kafka.tools.MirrorMaker$MirrorMakerThread.run
>   18.9% - 12,403 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   13.8% - 9,056 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   12.1% - 7,933 ms org.apache.kafka.clients.Metadata.containsTopic
>   1.7% - 1,088 ms org.apache.kafka.clients.Metadata.fetch
>   2.6% - 1,729 ms org.apache.kafka.clients.Metadata.fetch
>   2.2% - 1,442 ms 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> {code}
> After replacing this bottleneck with a kind of noop, another run of the 
> profiler shows that fetch is the next bottleneck:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy   132 s (54 %)   n/a n/a
>   java.lang.Thread.run 50,776 ms (21 %)   n/a n/a
>   org.apache.kafka.clients.Metadata.fetch  20,881 ms (8 %)n/a 
> n/a
>   6.8% - 16,546 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   6.8% - 16,546 ms kafka.tools.MirrorMaker$MirrorMakerProducer.send
> {code}
> however the fetch method does not need to be syncronized
> {code}
> public synchronized Cluster fetch() {
> return this.cluster;
> }
> {code}
> removing sync from the fetch method shows that bottleneck is disappeared:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy   249 s (78 %)   n/a n/a
>   org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel  
>  24,489 ms (7 %)n/a n/a
>   org.xerial.snappy.SnappyNative.rawUncompress 17,024 ms (5 %)
> n/a n/a
>   org.apache.kafka.clients.producer.internals.RecordAccumulator.append
>  13,817 ms (4 %)n/a n/a
>   4.3% - 13,817 ms org.apache.kafka.clients.producer.KafkaProducer.send
> {code}
> Internally we have applied a patch to remove this bottleneck. The patch does 
> the following:
> 1. replace HashSet with a concurrent hash set
> 2. remove sync from containsTopic and fetch
> 3. pass a replica of topics to getClusterForCurrentTopics since this 
> synchronized method access topics at two locations and topics being hanged in 
> the middle might mess with the semantics.
> Any interest in applying this patch? Any alternative suggestions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3428) Remove metadata sync bottleneck from mirrormaker's producer

2016-03-22 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3428:
---
Fix Version/s: (was: 0.10.0.0)
   0.10.0.1

> Remove metadata sync bottleneck from mirrormaker's producer
> ---
>
> Key: KAFKA-3428
> URL: https://issues.apache.org/jira/browse/KAFKA-3428
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
> Fix For: 0.10.0.1
>
>
> Due to sync on the single producer, MM in a setup with 32 consumer threads 
> could not send more than 
> 358k msg/sec hence not being able to saturate the NIC. Profiling showed the 
> producer.send takes 0.080 ms in average, which explains the bottleneck of 
> 358k msg/sec. The following explains the bottleneck in producer.send and 
> suggests how to improve it.
> Current impl of MM relies on a single reducer. For EACH message, the 
> producer.send() calls waitOnMetadata which runs the following synchronized 
> method
> {code}
> // add topic to metadata topic list if it is not there already.
> if (!this.metadata.containsTopic(topic))
> this.metadata.add(topic);
> {code}
> Although the code is mostly noop, since containsTopic is synchronized it 
> becomes the bottleneck in MM.
> Profiling highlights this bottleneck:
> {code}
> 100.0% - 65,539 ms kafka.tools.MirrorMaker$MirrorMakerThread.run
>   18.9% - 12,403 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   13.8% - 9,056 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   12.1% - 7,933 ms org.apache.kafka.clients.Metadata.containsTopic
>   1.7% - 1,088 ms org.apache.kafka.clients.Metadata.fetch
>   2.6% - 1,729 ms org.apache.kafka.clients.Metadata.fetch
>   2.2% - 1,442 ms 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> {code}
> After replacing this bottleneck with a kind of noop, another run of the 
> profiler shows that fetch is the next bottleneck:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy   132 s (54 %)   n/a n/a
>   java.lang.Thread.run 50,776 ms (21 %)   n/a n/a
>   org.apache.kafka.clients.Metadata.fetch  20,881 ms (8 %)n/a 
> n/a
>   6.8% - 16,546 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   6.8% - 16,546 ms kafka.tools.MirrorMaker$MirrorMakerProducer.send
> {code}
> however the fetch method does not need to be syncronized
> {code}
> public synchronized Cluster fetch() {
> return this.cluster;
> }
> {code}
> removing sync from the fetch method shows that bottleneck is disappeared:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy   249 s (78 %)   n/a n/a
>   org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel  
>  24,489 ms (7 %)n/a n/a
>   org.xerial.snappy.SnappyNative.rawUncompress 17,024 ms (5 %)
> n/a n/a
>   org.apache.kafka.clients.producer.internals.RecordAccumulator.append
>  13,817 ms (4 %)n/a n/a
>   4.3% - 13,817 ms org.apache.kafka.clients.producer.KafkaProducer.send
> {code}
> Internally we have applied a patch to remove this bottleneck. The patch does 
> the following:
> 1. replace HashSet with a concurrent hash set
> 2. remove sync from containsTopic and fetch
> 3. pass a replica of topics to getClusterForCurrentTopics since this 
> synchronized method access topics at two locations and topics being hanged in 
> the middle might mess with the semantics.
> Any interest in applying this patch? Any alternative suggestions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3428) Remove metadata sync bottleneck from mirrormaker's producer

2016-03-21 Thread Maysam Yabandeh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maysam Yabandeh updated KAFKA-3428:
---
 Reviewer: Ismael Juma
Fix Version/s: 0.10.0.0
Affects Version/s: 0.9.0.1
   Status: Patch Available  (was: Open)

> Remove metadata sync bottleneck from mirrormaker's producer
> ---
>
> Key: KAFKA-3428
> URL: https://issues.apache.org/jira/browse/KAFKA-3428
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
> Fix For: 0.10.0.0
>
>
> Due to sync on the single producer, MM in a setup with 32 consumer threads 
> could not send more than 
> 358k msg/sec hence not being able to saturate the NIC. Profiling showed the 
> producer.send takes 0.080 ms in average, which explains the bottleneck of 
> 358k msg/sec. The following explains the bottleneck in producer.send and 
> suggests how to improve it.
> Current impl of MM relies on a single reducer. For EACH message, the 
> producer.send() calls waitOnMetadata which runs the following synchronized 
> method
> {code}
> // add topic to metadata topic list if it is not there already.
> if (!this.metadata.containsTopic(topic))
> this.metadata.add(topic);
> {code}
> Although the code is mostly noop, since containsTopic is synchronized it 
> becomes the bottleneck in MM.
> Profiling highlights this bottleneck:
> {code}
> 100.0% - 65,539 ms kafka.tools.MirrorMaker$MirrorMakerThread.run
>   18.9% - 12,403 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   13.8% - 9,056 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   12.1% - 7,933 ms org.apache.kafka.clients.Metadata.containsTopic
>   1.7% - 1,088 ms org.apache.kafka.clients.Metadata.fetch
>   2.6% - 1,729 ms org.apache.kafka.clients.Metadata.fetch
>   2.2% - 1,442 ms 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> {code}
> After replacing this bottleneck with a kind of noop, another run of the 
> profiler shows that fetch is the next bottleneck:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy   132 s (54 %)   n/a n/a
>   java.lang.Thread.run 50,776 ms (21 %)   n/a n/a
>   org.apache.kafka.clients.Metadata.fetch  20,881 ms (8 %)n/a 
> n/a
>   6.8% - 16,546 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   6.8% - 16,546 ms kafka.tools.MirrorMaker$MirrorMakerProducer.send
> {code}
> however the fetch method does not need to be syncronized
> {code}
> public synchronized Cluster fetch() {
> return this.cluster;
> }
> {code}
> removing sync from the fetch method shows that bottleneck is disappeared:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy   249 s (78 %)   n/a n/a
>   org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel  
>  24,489 ms (7 %)n/a n/a
>   org.xerial.snappy.SnappyNative.rawUncompress 17,024 ms (5 %)
> n/a n/a
>   org.apache.kafka.clients.producer.internals.RecordAccumulator.append
>  13,817 ms (4 %)n/a n/a
>   4.3% - 13,817 ms org.apache.kafka.clients.producer.KafkaProducer.send
> {code}
> Internally we have applied a patch to remove this bottleneck. The patch does 
> the following:
> 1. replace HashSet with a concurrent hash set
> 2. remove sync from containsTopic and fetch
> 3. pass a replica of topics to getClusterForCurrentTopics since this 
> synchronized method access topics at two locations and topics being hanged in 
> the middle might mess with the semantics.
> Any interest in applying this patch? Any alternative suggestions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)