Re: Performance issue with KafkaStreams

2016-09-19 Thread Guozhang Wang
Hello Caleb,

`./gradlew jar` should be sufficient to run the SimpleBenchmark. Could you
look into kafka-run-class.sh and see if "streams/build/libs
/kafka-streams*.jar" is added to the dependent path? In trunk it is added.


Guozhang


On Sat, Sep 17, 2016 at 11:30 AM, Eno Thereska 
wrote:

> Hi Caleb,
>
> I usually do './gradlew installAll' first and that places all the jars in
> my local maven repo in ~/.m2/repository.
>
> Eno
>
> > On 17 Sep 2016, at 00:30, Caleb Welton  wrote:
> >
> > Is there a specific way that I need to build kafka for that to work?
> >
> > bash$  export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh
> > org.apache.kafka.streams.perf.SimpleBenchmark
> > Error: Could not find or load main class
> > org.apache.kafka.streams.perf.SimpleBenchmark
> >
> > bash$ find . -name SimpleBenchmark.java
> > ./streams/src/test/java/org/apache/kafka/streams/perf/
> SimpleBenchmark.java
> >
> > bash$  find . -name SimpleBenchmark.class
> >
> > bash$ jar -tf streams/build/libs/kafka-streams-0.10.1.0-SNAPSHOT.jar |
> grep
> > SimpleBenchmark
> >
> >
> >
> > On Sat, Sep 10, 2016 at 6:18 AM, Eno Thereska 
> > wrote:
> >
> >> Hi Caleb,
> >>
> >> We have a benchmark that we run nightly to keep track of performance.
> The
> >> numbers we have do indicate that consuming through streams is indeed
> slower
> >> than just a pure consumer, however the performance difference is not as
> >> large as you are observing. Would it be possible for you to run this
> >> benchmark in your environment and report the numbers? The benchmark is
> >> included with Kafka Streams and you run it like this:
> >>
> >> export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh
> >> org.apache.kafka.streams.perf.SimpleBenchmark
> >>
> >> You'll need a Kafka broker to be running. The benchmark will report
> >> various numbers, but one of them is the performance of the consumer and
> the
> >> performance of streams reading.
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 9 Sep 2016, at 18:19, Caleb Welton 
> >> wrote:
> >>>
> >>> Same in both cases:
> >>> client.id=Test-Prototype
> >>> application.id=test-prototype
> >>>
> >>> group.id=test-consumer-group
> >>> bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
> >>> replication.factor=2
> >>>
> >>> auto.offset.reset=earliest
> >>>
> >>>
> >>> On Friday, September 9, 2016 8:48 AM, Eno Thereska <
> >> eno.there...@gmail.com> wrote:
> >>>
> >>>
> >>>
> >>> Hi Caleb,
> >>>
> >>> Could you share your Kafka Streams configuration (i.e., StreamsConfig
> >>> properties you might have set before the test)?
> >>>
> >>> Thanks
> >>> Eno
> >>>
> >>>
> >>> On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton 
> >> wrote:
> >>>
>  I have a question with respect to the KafkaStreams API.
> 
>  I noticed during my prototyping work that my KafkaStreams application
> >> was
>  not able to keep up with the input on the stream so I dug into it a
> bit
> >> and
>  found that it was spending an inordinate amount of time in
>  org.apache.kafka.common.network.Seloctor.select().  Not exactly a
> >> shooting
>  gun itself, so I dropped the implementation down to a single processor
>  reading off a source.
> 
>  public class TestProcessor extends AbstractProcessor {
>    static long start = -1;
>    static long count = 0;
> 
>    @Override
>    public void process(String key, String value) {
>    if (start < 0) {
>    start = System.currentTimeMillis();
>    }
>    count += 1;
>    if (count > 100) {
>    long end = System.currentTimeMillis();
>    double time = (end-start)/1000.0;
>    System.out.printf("Processed %d records in %f seconds (%f
>  records/s)\n", count, time, count/time);
>    start = -1;
>    count = 0;
>    }
> 
>    }
> 
>  }
> 
>  ...
> 
> 
>  TopologyBuilder topologyBuilder = new TopologyBuilder();
>  topologyBuilder
>    .addSource("SOURCE", stringDeserializer, StringDeserializer,
>  "input")
>    .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
> 
> 
> 
>  Which I then ran through the KafkaStreams API, and then repeated with
>  the KafkaConsumer API.
> 
>  Using the KafkaConsumer API:
>  Processed 101 records in 1.79 seconds (558659.776536
> records/s)
>  Processed 101 records in 1.229000 seconds (813670.463792
> records/s)
>  Processed 101 records in 1.106000 seconds (904160.036166
> records/s)
>  Processed 101 records in 1.19 seconds (840336.974790
> records/s)
> 
>  Using the KafkaStreams API:
>  Processed 101 records in 6.407000 seconds (156079.444358
> records/s)
>  Processed 101 records in 5.256000 seconds (190258.942161
> records/s)
>  Processed 101 records in 5.141000 seconds (194514.880373
> records/s)
>  Processed 101 records in

Re: Performance issue with KafkaStreams

2016-09-17 Thread Eno Thereska
Hi Caleb,

I usually do './gradlew installAll' first and that places all the jars in my 
local maven repo in ~/.m2/repository.

Eno

> On 17 Sep 2016, at 00:30, Caleb Welton  wrote:
> 
> Is there a specific way that I need to build kafka for that to work?
> 
> bash$  export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh
> org.apache.kafka.streams.perf.SimpleBenchmark
> Error: Could not find or load main class
> org.apache.kafka.streams.perf.SimpleBenchmark
> 
> bash$ find . -name SimpleBenchmark.java
> ./streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
> 
> bash$  find . -name SimpleBenchmark.class
> 
> bash$ jar -tf streams/build/libs/kafka-streams-0.10.1.0-SNAPSHOT.jar | grep
> SimpleBenchmark
> 
> 
> 
> On Sat, Sep 10, 2016 at 6:18 AM, Eno Thereska 
> wrote:
> 
>> Hi Caleb,
>> 
>> We have a benchmark that we run nightly to keep track of performance. The
>> numbers we have do indicate that consuming through streams is indeed slower
>> than just a pure consumer, however the performance difference is not as
>> large as you are observing. Would it be possible for you to run this
>> benchmark in your environment and report the numbers? The benchmark is
>> included with Kafka Streams and you run it like this:
>> 
>> export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh
>> org.apache.kafka.streams.perf.SimpleBenchmark
>> 
>> You'll need a Kafka broker to be running. The benchmark will report
>> various numbers, but one of them is the performance of the consumer and the
>> performance of streams reading.
>> 
>> Thanks
>> Eno
>> 
>>> On 9 Sep 2016, at 18:19, Caleb Welton 
>> wrote:
>>> 
>>> Same in both cases:
>>> client.id=Test-Prototype
>>> application.id=test-prototype
>>> 
>>> group.id=test-consumer-group
>>> bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
>>> replication.factor=2
>>> 
>>> auto.offset.reset=earliest
>>> 
>>> 
>>> On Friday, September 9, 2016 8:48 AM, Eno Thereska <
>> eno.there...@gmail.com> wrote:
>>> 
>>> 
>>> 
>>> Hi Caleb,
>>> 
>>> Could you share your Kafka Streams configuration (i.e., StreamsConfig
>>> properties you might have set before the test)?
>>> 
>>> Thanks
>>> Eno
>>> 
>>> 
>>> On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton 
>> wrote:
>>> 
 I have a question with respect to the KafkaStreams API.
 
 I noticed during my prototyping work that my KafkaStreams application
>> was
 not able to keep up with the input on the stream so I dug into it a bit
>> and
 found that it was spending an inordinate amount of time in
 org.apache.kafka.common.network.Seloctor.select().  Not exactly a
>> shooting
 gun itself, so I dropped the implementation down to a single processor
 reading off a source.
 
 public class TestProcessor extends AbstractProcessor {
   static long start = -1;
   static long count = 0;
 
   @Override
   public void process(String key, String value) {
   if (start < 0) {
   start = System.currentTimeMillis();
   }
   count += 1;
   if (count > 100) {
   long end = System.currentTimeMillis();
   double time = (end-start)/1000.0;
   System.out.printf("Processed %d records in %f seconds (%f
 records/s)\n", count, time, count/time);
   start = -1;
   count = 0;
   }
 
   }
 
 }
 
 ...
 
 
 TopologyBuilder topologyBuilder = new TopologyBuilder();
 topologyBuilder
   .addSource("SOURCE", stringDeserializer, StringDeserializer,
 "input")
   .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
 
 
 
 Which I then ran through the KafkaStreams API, and then repeated with
 the KafkaConsumer API.
 
 Using the KafkaConsumer API:
 Processed 101 records in 1.79 seconds (558659.776536 records/s)
 Processed 101 records in 1.229000 seconds (813670.463792 records/s)
 Processed 101 records in 1.106000 seconds (904160.036166 records/s)
 Processed 101 records in 1.19 seconds (840336.974790 records/s)
 
 Using the KafkaStreams API:
 Processed 101 records in 6.407000 seconds (156079.444358 records/s)
 Processed 101 records in 5.256000 seconds (190258.942161 records/s)
 Processed 101 records in 5.141000 seconds (194514.880373 records/s)
 Processed 101 records in 5.111000 seconds (195656.622970 records/s)
 
 
 The profile on the KafkaStreams consisted of:
 
 89.2% org.apache.kafka.common.network.Selector.select()
 7.6% org.apache.kafka.clients.producer.internals.
 ProduceRequestResult.await()
 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
 
 
 Is a 5X performance difference between Kafka Consumer and the
 KafkaStreams api expected?
 
 Are there specific things I can do to diagnose/tune the system?
 
 Thanks,
 Caleb
>>>

Re: Performance issue with KafkaStreams

2016-09-16 Thread Caleb Welton
Is there a specific way that I need to build kafka for that to work?

bash$  export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh
org.apache.kafka.streams.perf.SimpleBenchmark
Error: Could not find or load main class
org.apache.kafka.streams.perf.SimpleBenchmark

bash$ find . -name SimpleBenchmark.java
./streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java

bash$  find . -name SimpleBenchmark.class

bash$ jar -tf streams/build/libs/kafka-streams-0.10.1.0-SNAPSHOT.jar | grep
SimpleBenchmark



On Sat, Sep 10, 2016 at 6:18 AM, Eno Thereska 
wrote:

> Hi Caleb,
>
> We have a benchmark that we run nightly to keep track of performance. The
> numbers we have do indicate that consuming through streams is indeed slower
> than just a pure consumer, however the performance difference is not as
> large as you are observing. Would it be possible for you to run this
> benchmark in your environment and report the numbers? The benchmark is
> included with Kafka Streams and you run it like this:
>
> export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh
> org.apache.kafka.streams.perf.SimpleBenchmark
>
> You'll need a Kafka broker to be running. The benchmark will report
> various numbers, but one of them is the performance of the consumer and the
> performance of streams reading.
>
> Thanks
> Eno
>
> > On 9 Sep 2016, at 18:19, Caleb Welton 
> wrote:
> >
> > Same in both cases:
> > client.id=Test-Prototype
> > application.id=test-prototype
> >
> > group.id=test-consumer-group
> > bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
> > replication.factor=2
> >
> > auto.offset.reset=earliest
> >
> >
> > On Friday, September 9, 2016 8:48 AM, Eno Thereska <
> eno.there...@gmail.com> wrote:
> >
> >
> >
> > Hi Caleb,
> >
> > Could you share your Kafka Streams configuration (i.e., StreamsConfig
> > properties you might have set before the test)?
> >
> > Thanks
> > Eno
> >
> >
> > On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton 
> wrote:
> >
> >> I have a question with respect to the KafkaStreams API.
> >>
> >> I noticed during my prototyping work that my KafkaStreams application
> was
> >> not able to keep up with the input on the stream so I dug into it a bit
> and
> >> found that it was spending an inordinate amount of time in
> >> org.apache.kafka.common.network.Seloctor.select().  Not exactly a
> shooting
> >> gun itself, so I dropped the implementation down to a single processor
> >> reading off a source.
> >>
> >> public class TestProcessor extends AbstractProcessor {
> >>static long start = -1;
> >>static long count = 0;
> >>
> >>@Override
> >>public void process(String key, String value) {
> >>if (start < 0) {
> >>start = System.currentTimeMillis();
> >>}
> >>count += 1;
> >>if (count > 100) {
> >>long end = System.currentTimeMillis();
> >>double time = (end-start)/1000.0;
> >>System.out.printf("Processed %d records in %f seconds (%f
> >> records/s)\n", count, time, count/time);
> >>start = -1;
> >>count = 0;
> >>}
> >>
> >>}
> >>
> >> }
> >>
> >> ...
> >>
> >>
> >> TopologyBuilder topologyBuilder = new TopologyBuilder();
> >> topologyBuilder
> >>.addSource("SOURCE", stringDeserializer, StringDeserializer,
> >> "input")
> >>.addProcessor("PROCESS", TestProcessor::new, "SOURCE");
> >>
> >>
> >>
> >> Which I then ran through the KafkaStreams API, and then repeated with
> >> the KafkaConsumer API.
> >>
> >> Using the KafkaConsumer API:
> >> Processed 101 records in 1.79 seconds (558659.776536 records/s)
> >> Processed 101 records in 1.229000 seconds (813670.463792 records/s)
> >> Processed 101 records in 1.106000 seconds (904160.036166 records/s)
> >> Processed 101 records in 1.19 seconds (840336.974790 records/s)
> >>
> >> Using the KafkaStreams API:
> >> Processed 101 records in 6.407000 seconds (156079.444358 records/s)
> >> Processed 101 records in 5.256000 seconds (190258.942161 records/s)
> >> Processed 101 records in 5.141000 seconds (194514.880373 records/s)
> >> Processed 101 records in 5.111000 seconds (195656.622970 records/s)
> >>
> >>
> >> The profile on the KafkaStreams consisted of:
> >>
> >> 89.2% org.apache.kafka.common.network.Selector.select()
> >> 7.6% org.apache.kafka.clients.producer.internals.
> >> ProduceRequestResult.await()
> >> 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
> >>
> >>
> >> Is a 5X performance difference between Kafka Consumer and the
> >> KafkaStreams api expected?
> >>
> >> Are there specific things I can do to diagnose/tune the system?
> >>
> >> Thanks,
> >>  Caleb
> >>
>
>


Re: Performance issue with KafkaStreams

2016-09-11 Thread Eno Thereska
Hi Ara,

For parallelism you should have as many Kafka Stream instances as partitions. 
The instances can run on different servers (each instance can have 1 thread). 
For a single server on the other hand, you can either start multiple 
single-threaded instances on that server, or a single instance with multiple 
threads.

There is some more info at 
http://docs.confluent.io/3.0.1/streams/architecture.html#parallelism-model 
 
and also a blog post at 
http://www.confluent.io/blog/elastic-scaling-in-kafka-streams/ 


Thanks
Eno


> On 10 Sep 2016, at 22:01, Ara Ebrahimi  wrote:
> 
> Hi Eno,
> 
> Could you elaborate more on tuning Kafka Streaming applications? What are the 
> relationships between partitions and num.stream.threads num.consumer.fetchers 
> and other such parameters? On a single node setup with x partitions, what’s 
> the best way to make sure these partitions are consumed and processed by 
> kafka streams optimally?
> 
> Ara.
> 
>> On Sep 10, 2016, at 3:18 AM, Eno Thereska  wrote:
>> 
>> Hi Caleb,
>> 
>> We have a benchmark that we run nightly to keep track of performance. The 
>> numbers we have do indicate that consuming through streams is indeed slower 
>> than just a pure consumer, however the performance difference is not as 
>> large as you are observing. Would it be possible for you to run this 
>> benchmark in your environment and report the numbers? The benchmark is 
>> included with Kafka Streams and you run it like this:
>> 
>> export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh 
>> org.apache.kafka.streams.perf.SimpleBenchmark
>> 
>> You'll need a Kafka broker to be running. The benchmark will report various 
>> numbers, but one of them is the performance of the consumer and the 
>> performance of streams reading.
>> 
>> Thanks
>> Eno
>> 
>>> On 9 Sep 2016, at 18:19, Caleb Welton  wrote:
>>> 
>>> Same in both cases:
>>> client.id=Test-Prototype
>>> application.id=test-prototype
>>> 
>>> group.id=test-consumer-group
>>> bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
>>> replication.factor=2
>>> 
>>> auto.offset.reset=earliest
>>> 
>>> 
>>> On Friday, September 9, 2016 8:48 AM, Eno Thereska  
>>> wrote:
>>> 
>>> 
>>> 
>>> Hi Caleb,
>>> 
>>> Could you share your Kafka Streams configuration (i.e., StreamsConfig
>>> properties you might have set before the test)?
>>> 
>>> Thanks
>>> Eno
>>> 
>>> 
>>> On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton  wrote:
>>> 
 I have a question with respect to the KafkaStreams API.
 
 I noticed during my prototyping work that my KafkaStreams application was
 not able to keep up with the input on the stream so I dug into it a bit and
 found that it was spending an inordinate amount of time in
 org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
 gun itself, so I dropped the implementation down to a single processor
 reading off a source.
 
 public class TestProcessor extends AbstractProcessor {
  static long start = -1;
  static long count = 0;
 
  @Override
  public void process(String key, String value) {
  if (start < 0) {
  start = System.currentTimeMillis();
  }
  count += 1;
  if (count > 100) {
  long end = System.currentTimeMillis();
  double time = (end-start)/1000.0;
  System.out.printf("Processed %d records in %f seconds (%f
 records/s)\n", count, time, count/time);
  start = -1;
  count = 0;
  }
 
  }
 
 }
 
 ...
 
 
 TopologyBuilder topologyBuilder = new TopologyBuilder();
 topologyBuilder
  .addSource("SOURCE", stringDeserializer, StringDeserializer,
 "input")
  .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
 
 
 
 Which I then ran through the KafkaStreams API, and then repeated with
 the KafkaConsumer API.
 
 Using the KafkaConsumer API:
 Processed 101 records in 1.79 seconds (558659.776536 records/s)
 Processed 101 records in 1.229000 seconds (813670.463792 records/s)
 Processed 101 records in 1.106000 seconds (904160.036166 records/s)
 Processed 101 records in 1.19 seconds (840336.974790 records/s)
 
 Using the KafkaStreams API:
 Processed 101 records in 6.407000 seconds (156079.444358 records/s)
 Processed 101 records in 5.256000 seconds (190258.942161 records/s)
 Processed 101 records in 5.141000 seconds (194514.880373 records/s)
 Processed 101 records in 5.111000 seconds (195656.622970 records/s)
 
 
 The profile on the KafkaStreams consisted of:
 
 89.2% org.apache.kafka.common.network.Selector.select()
 7.6% org.apache.kafka.clients.producer.internals.
 ProduceRequestR

Re: Performance issue with KafkaStreams

2016-09-10 Thread Ara Ebrahimi
Hi Eno,

Could you elaborate more on tuning Kafka Streaming applications? What are the 
relationships between partitions and num.stream.threads num.consumer.fetchers 
and other such parameters? On a single node setup with x partitions, what’s the 
best way to make sure these partitions are consumed and processed by kafka 
streams optimally?

Ara.

> On Sep 10, 2016, at 3:18 AM, Eno Thereska  wrote:
>
> Hi Caleb,
>
> We have a benchmark that we run nightly to keep track of performance. The 
> numbers we have do indicate that consuming through streams is indeed slower 
> than just a pure consumer, however the performance difference is not as large 
> as you are observing. Would it be possible for you to run this benchmark in 
> your environment and report the numbers? The benchmark is included with Kafka 
> Streams and you run it like this:
>
> export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh 
> org.apache.kafka.streams.perf.SimpleBenchmark
>
> You'll need a Kafka broker to be running. The benchmark will report various 
> numbers, but one of them is the performance of the consumer and the 
> performance of streams reading.
>
> Thanks
> Eno
>
>> On 9 Sep 2016, at 18:19, Caleb Welton  wrote:
>>
>> Same in both cases:
>> client.id=Test-Prototype
>> application.id=test-prototype
>>
>> group.id=test-consumer-group
>> bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
>> replication.factor=2
>>
>> auto.offset.reset=earliest
>>
>>
>> On Friday, September 9, 2016 8:48 AM, Eno Thereska  
>> wrote:
>>
>>
>>
>> Hi Caleb,
>>
>> Could you share your Kafka Streams configuration (i.e., StreamsConfig
>> properties you might have set before the test)?
>>
>> Thanks
>> Eno
>>
>>
>> On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton  wrote:
>>
>>> I have a question with respect to the KafkaStreams API.
>>>
>>> I noticed during my prototyping work that my KafkaStreams application was
>>> not able to keep up with the input on the stream so I dug into it a bit and
>>> found that it was spending an inordinate amount of time in
>>> org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
>>> gun itself, so I dropped the implementation down to a single processor
>>> reading off a source.
>>>
>>> public class TestProcessor extends AbstractProcessor {
>>>   static long start = -1;
>>>   static long count = 0;
>>>
>>>   @Override
>>>   public void process(String key, String value) {
>>>   if (start < 0) {
>>>   start = System.currentTimeMillis();
>>>   }
>>>   count += 1;
>>>   if (count > 100) {
>>>   long end = System.currentTimeMillis();
>>>   double time = (end-start)/1000.0;
>>>   System.out.printf("Processed %d records in %f seconds (%f
>>> records/s)\n", count, time, count/time);
>>>   start = -1;
>>>   count = 0;
>>>   }
>>>
>>>   }
>>>
>>> }
>>>
>>> ...
>>>
>>>
>>> TopologyBuilder topologyBuilder = new TopologyBuilder();
>>> topologyBuilder
>>>   .addSource("SOURCE", stringDeserializer, StringDeserializer,
>>> "input")
>>>   .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
>>>
>>>
>>>
>>> Which I then ran through the KafkaStreams API, and then repeated with
>>> the KafkaConsumer API.
>>>
>>> Using the KafkaConsumer API:
>>> Processed 101 records in 1.79 seconds (558659.776536 records/s)
>>> Processed 101 records in 1.229000 seconds (813670.463792 records/s)
>>> Processed 101 records in 1.106000 seconds (904160.036166 records/s)
>>> Processed 101 records in 1.19 seconds (840336.974790 records/s)
>>>
>>> Using the KafkaStreams API:
>>> Processed 101 records in 6.407000 seconds (156079.444358 records/s)
>>> Processed 101 records in 5.256000 seconds (190258.942161 records/s)
>>> Processed 101 records in 5.141000 seconds (194514.880373 records/s)
>>> Processed 101 records in 5.111000 seconds (195656.622970 records/s)
>>>
>>>
>>> The profile on the KafkaStreams consisted of:
>>>
>>> 89.2% org.apache.kafka.common.network.Selector.select()
>>> 7.6% org.apache.kafka.clients.producer.internals.
>>> ProduceRequestResult.await()
>>> 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
>>>
>>>
>>> Is a 5X performance difference between Kafka Consumer and the
>>> KafkaStreams api expected?
>>>
>>> Are there specific things I can do to diagnose/tune the system?
>>>
>>> Thanks,
>>> Caleb
>>>
>
>
>
>
> 
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
>
> 






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If

Re: Performance issue with KafkaStreams

2016-09-10 Thread Eno Thereska
Hi Caleb,

We have a benchmark that we run nightly to keep track of performance. The 
numbers we have do indicate that consuming through streams is indeed slower 
than just a pure consumer, however the performance difference is not as large 
as you are observing. Would it be possible for you to run this benchmark in 
your environment and report the numbers? The benchmark is included with Kafka 
Streams and you run it like this:

export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh 
org.apache.kafka.streams.perf.SimpleBenchmark

You'll need a Kafka broker to be running. The benchmark will report various 
numbers, but one of them is the performance of the consumer and the performance 
of streams reading.

Thanks
Eno

> On 9 Sep 2016, at 18:19, Caleb Welton  wrote:
> 
> Same in both cases:
> client.id=Test-Prototype
> application.id=test-prototype
> 
> group.id=test-consumer-group
> bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
> replication.factor=2
> 
> auto.offset.reset=earliest
> 
> 
> On Friday, September 9, 2016 8:48 AM, Eno Thereska  
> wrote:
> 
> 
> 
> Hi Caleb,
> 
> Could you share your Kafka Streams configuration (i.e., StreamsConfig
> properties you might have set before the test)?
> 
> Thanks
> Eno
> 
> 
> On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton  wrote:
> 
>> I have a question with respect to the KafkaStreams API.
>> 
>> I noticed during my prototyping work that my KafkaStreams application was
>> not able to keep up with the input on the stream so I dug into it a bit and
>> found that it was spending an inordinate amount of time in
>> org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
>> gun itself, so I dropped the implementation down to a single processor
>> reading off a source.
>> 
>> public class TestProcessor extends AbstractProcessor {
>>static long start = -1;
>>static long count = 0;
>> 
>>@Override
>>public void process(String key, String value) {
>>if (start < 0) {
>>start = System.currentTimeMillis();
>>}
>>count += 1;
>>if (count > 100) {
>>long end = System.currentTimeMillis();
>>double time = (end-start)/1000.0;
>>System.out.printf("Processed %d records in %f seconds (%f
>> records/s)\n", count, time, count/time);
>>start = -1;
>>count = 0;
>>}
>> 
>>}
>> 
>> }
>> 
>> ...
>> 
>> 
>> TopologyBuilder topologyBuilder = new TopologyBuilder();
>> topologyBuilder
>>.addSource("SOURCE", stringDeserializer, StringDeserializer,
>> "input")
>>.addProcessor("PROCESS", TestProcessor::new, "SOURCE");
>> 
>> 
>> 
>> Which I then ran through the KafkaStreams API, and then repeated with
>> the KafkaConsumer API.
>> 
>> Using the KafkaConsumer API:
>> Processed 101 records in 1.79 seconds (558659.776536 records/s)
>> Processed 101 records in 1.229000 seconds (813670.463792 records/s)
>> Processed 101 records in 1.106000 seconds (904160.036166 records/s)
>> Processed 101 records in 1.19 seconds (840336.974790 records/s)
>> 
>> Using the KafkaStreams API:
>> Processed 101 records in 6.407000 seconds (156079.444358 records/s)
>> Processed 101 records in 5.256000 seconds (190258.942161 records/s)
>> Processed 101 records in 5.141000 seconds (194514.880373 records/s)
>> Processed 101 records in 5.111000 seconds (195656.622970 records/s)
>> 
>> 
>> The profile on the KafkaStreams consisted of:
>> 
>> 89.2% org.apache.kafka.common.network.Selector.select()
>> 7.6% org.apache.kafka.clients.producer.internals.
>> ProduceRequestResult.await()
>> 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
>> 
>> 
>> Is a 5X performance difference between Kafka Consumer and the
>> KafkaStreams api expected?
>> 
>> Are there specific things I can do to diagnose/tune the system?
>> 
>> Thanks,
>>  Caleb
>> 



Re: Performance issue with KafkaStreams

2016-09-09 Thread Caleb Welton
Same in both cases:
client.id=Test-Prototype
application.id=test-prototype

group.id=test-consumer-group
bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
replication.factor=2

auto.offset.reset=earliest


On Friday, September 9, 2016 8:48 AM, Eno Thereska  
wrote:



Hi Caleb,

Could you share your Kafka Streams configuration (i.e., StreamsConfig
properties you might have set before the test)?

Thanks
Eno


On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton  wrote:

> I have a question with respect to the KafkaStreams API.
>
> I noticed during my prototyping work that my KafkaStreams application was
> not able to keep up with the input on the stream so I dug into it a bit and
> found that it was spending an inordinate amount of time in
> org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
> gun itself, so I dropped the implementation down to a single processor
> reading off a source.
>
> public class TestProcessor extends AbstractProcessor {
> static long start = -1;
> static long count = 0;
>
> @Override
> public void process(String key, String value) {
> if (start < 0) {
> start = System.currentTimeMillis();
> }
> count += 1;
> if (count > 100) {
> long end = System.currentTimeMillis();
> double time = (end-start)/1000.0;
> System.out.printf("Processed %d records in %f seconds (%f
> records/s)\n", count, time, count/time);
> start = -1;
> count = 0;
> }
>
> }
>
> }
>
> ...
>
>
> TopologyBuilder topologyBuilder = new TopologyBuilder();
> topologyBuilder
> .addSource("SOURCE", stringDeserializer, StringDeserializer,
> "input")
> .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
>
>
>
> Which I then ran through the KafkaStreams API, and then repeated with
> the KafkaConsumer API.
>
> Using the KafkaConsumer API:
> Processed 101 records in 1.79 seconds (558659.776536 records/s)
> Processed 101 records in 1.229000 seconds (813670.463792 records/s)
> Processed 101 records in 1.106000 seconds (904160.036166 records/s)
> Processed 101 records in 1.19 seconds (840336.974790 records/s)
>
> Using the KafkaStreams API:
> Processed 101 records in 6.407000 seconds (156079.444358 records/s)
> Processed 101 records in 5.256000 seconds (190258.942161 records/s)
> Processed 101 records in 5.141000 seconds (194514.880373 records/s)
> Processed 101 records in 5.111000 seconds (195656.622970 records/s)
>
>
> The profile on the KafkaStreams consisted of:
>
> 89.2% org.apache.kafka.common.network.Selector.select()
>  7.6% org.apache.kafka.clients.producer.internals.
> ProduceRequestResult.await()
>  0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
>
>
> Is a 5X performance difference between Kafka Consumer and the
> KafkaStreams api expected?
>
> Are there specific things I can do to diagnose/tune the system?
>
> Thanks,
>   Caleb
>


Re: Performance issue with KafkaStreams

2016-09-09 Thread Eno Thereska
Hi Caleb,

Could you share your Kafka Streams configuration (i.e., StreamsConfig
properties you might have set before the test)?

Thanks
Eno

On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton  wrote:

> I have a question with respect to the KafkaStreams API.
>
> I noticed during my prototyping work that my KafkaStreams application was
> not able to keep up with the input on the stream so I dug into it a bit and
> found that it was spending an inordinate amount of time in
> org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
> gun itself, so I dropped the implementation down to a single processor
> reading off a source.
>
> public class TestProcessor extends AbstractProcessor {
> static long start = -1;
> static long count = 0;
>
> @Override
> public void process(String key, String value) {
> if (start < 0) {
> start = System.currentTimeMillis();
> }
> count += 1;
> if (count > 100) {
> long end = System.currentTimeMillis();
> double time = (end-start)/1000.0;
> System.out.printf("Processed %d records in %f seconds (%f
> records/s)\n", count, time, count/time);
> start = -1;
> count = 0;
> }
>
> }
>
> }
>
> ...
>
>
> TopologyBuilder topologyBuilder = new TopologyBuilder();
> topologyBuilder
> .addSource("SOURCE", stringDeserializer, StringDeserializer,
> "input")
> .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
>
>
>
> Which I then ran through the KafkaStreams API, and then repeated with
> the KafkaConsumer API.
>
> Using the KafkaConsumer API:
> Processed 101 records in 1.79 seconds (558659.776536 records/s)
> Processed 101 records in 1.229000 seconds (813670.463792 records/s)
> Processed 101 records in 1.106000 seconds (904160.036166 records/s)
> Processed 101 records in 1.19 seconds (840336.974790 records/s)
>
> Using the KafkaStreams API:
> Processed 101 records in 6.407000 seconds (156079.444358 records/s)
> Processed 101 records in 5.256000 seconds (190258.942161 records/s)
> Processed 101 records in 5.141000 seconds (194514.880373 records/s)
> Processed 101 records in 5.111000 seconds (195656.622970 records/s)
>
>
> The profile on the KafkaStreams consisted of:
>
> 89.2% org.apache.kafka.common.network.Selector.select()
>  7.6% org.apache.kafka.clients.producer.internals.
> ProduceRequestResult.await()
>  0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
>
>
> Is a 5X performance difference between Kafka Consumer and the
> KafkaStreams api expected?
>
> Are there specific things I can do to diagnose/tune the system?
>
> Thanks,
>   Caleb
>