Hi Caleb,

We have a benchmark that we run nightly to keep track of performance. The 
numbers we have do indicate that consuming through streams is indeed slower 
than just a pure consumer, however the performance difference is not as large 
as you are observing. Would it be possible for you to run this benchmark in 
your environment and report the numbers? The benchmark is included with Kafka 
Streams and you run it like this:

export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh 
org.apache.kafka.streams.perf.SimpleBenchmark

You'll need a Kafka broker to be running. The benchmark will report various 
numbers, but one of them is the performance of the consumer and the performance 
of streams reading.

Thanks
Eno

> On 9 Sep 2016, at 18:19, Caleb Welton <cewel...@yahoo.com.INVALID> wrote:
> 
> Same in both cases:
> client.id=Test-Prototype
> application.id=test-prototype
> 
> group.id=test-consumer-group
> bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
> replication.factor=2
> 
> auto.offset.reset=earliest
> 
> 
> On Friday, September 9, 2016 8:48 AM, Eno Thereska <eno.there...@gmail.com> 
> wrote:
> 
> 
> 
> Hi Caleb,
> 
> Could you share your Kafka Streams configuration (i.e., StreamsConfig
> properties you might have set before the test)?
> 
> Thanks
> Eno
> 
> 
> On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton <cwel...@apache.org> wrote:
> 
>> I have a question with respect to the KafkaStreams API.
>> 
>> I noticed during my prototyping work that my KafkaStreams application was
>> not able to keep up with the input on the stream so I dug into it a bit and
>> found that it was spending an inordinate amount of time in
>> org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
>> gun itself, so I dropped the implementation down to a single processor
>> reading off a source.
>> 
>> public class TestProcessor extends AbstractProcessor<String, String> {
>>    static long start = -1;
>>    static long count = 0;
>> 
>>    @Override
>>    public void process(String key, String value) {
>>        if (start < 0) {
>>            start = System.currentTimeMillis();
>>        }
>>        count += 1;
>>        if (count > 1000000) {
>>            long end = System.currentTimeMillis();
>>            double time = (end-start)/1000.0;
>>            System.out.printf("Processed %d records in %f seconds (%f
>> records/s)\n", count, time, count/time);
>>            start = -1;
>>            count = 0;
>>        }
>> 
>>    }
>> 
>> }
>> 
>> ...
>> 
>> 
>> TopologyBuilder topologyBuilder = new TopologyBuilder();
>> topologyBuilder
>>        .addSource("SOURCE", stringDeserializer, StringDeserializer,
>> "input")
>>        .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
>> 
>> 
>> 
>> Which I then ran through the KafkaStreams API, and then repeated with
>> the KafkaConsumer API.
>> 
>> Using the KafkaConsumer API:
>> Processed 1000001 records in 1.790000 seconds (558659.776536 records/s)
>> Processed 1000001 records in 1.229000 seconds (813670.463792 records/s)
>> Processed 1000001 records in 1.106000 seconds (904160.036166 records/s)
>> Processed 1000001 records in 1.190000 seconds (840336.974790 records/s)
>> 
>> Using the KafkaStreams API:
>> Processed 1000001 records in 6.407000 seconds (156079.444358 records/s)
>> Processed 1000001 records in 5.256000 seconds (190258.942161 records/s)
>> Processed 1000001 records in 5.141000 seconds (194514.880373 records/s)
>> Processed 1000001 records in 5.111000 seconds (195656.622970 records/s)
>> 
>> 
>> The profile on the KafkaStreams consisted of:
>> 
>> 89.2% org.apache.kafka.common.network.Selector.select()
>> 7.6% org.apache.kafka.clients.producer.internals.
>> ProduceRequestResult.await()
>> 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
>> 
>> 
>> Is a 5X performance difference between Kafka Consumer and the
>> KafkaStreams api expected?
>> 
>> Are there specific things I can do to diagnose/tune the system?
>> 
>> Thanks,
>>  Caleb
>> 

Reply via email to