Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-12-01 Thread Guozhang Wang
Thanks! On Thu, Dec 1, 2016 at 5:17 AM, Hamidreza Afzali < hamidreza.afz...@hivestreaming.com> wrote: > I have added an example for KStreamDriver to the GitHub Gist and updated > the JIRA issue. > > https://issues.apache.org/jira/browse/KAFKA-4461 > >

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-12-01 Thread Hamidreza Afzali
I have added an example for KStreamDriver to the GitHub Gist and updated the JIRA issue. https://issues.apache.org/jira/browse/KAFKA-4461 https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13 Hamid

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-29 Thread Guozhang Wang
Hamid, Could you paste your code using KStreamDriver that does not have this issue into the JIRA as well? I suspect KStreamDriver should have the same issue and wondering why it did not. Guozhang On Tue, Nov 29, 2016 at 10:38 AM, Matthias J. Sax wrote: > Thanks! > > On

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-29 Thread Matthias J. Sax
Thanks! On 11/29/16 7:18 AM, Hamidreza Afzali wrote: > I have created a JIRA issue: > > https://issues.apache.org/jira/browse/KAFKA-4461 > > > Hamid > signature.asc Description: OpenPGP digital signature

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-29 Thread Hamidreza Afzali
I have created a JIRA issue: https://issues.apache.org/jira/browse/KAFKA-4461 Hamid

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-28 Thread Matthias J. Sax
Hamid, would you mind creating a Jira? Thanks. -Matthias On 11/28/16 9:36 AM, Guozhang Wang wrote: > Damian, Hamid: > > I looked at the source code and suspect that it is because of the > auto-repartitioning which causes the topology to not directly forward the > record to the child

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-24 Thread Hamidreza Afzali
Hi Damian, It processes correctly when using KStreamTestDriver. Best, Hamid

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-24 Thread Damian Guy
Hi Hamid, Out of interest - what are the results if you use KStreamTestDriver? Thanks, Damian On Thu, 24 Nov 2016 at 12:05 Hamidreza Afzali < hamidreza.afz...@hivestreaming.com> wrote: > The map() returns non-null keys and values and produces the following > stream: > >

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-23 Thread Matthias J. Sax
CACHE_MAX_BYTES_BUFFERING_CONFIG does not have any impact if you query the state. If you query it, you will always get the latest values. CACHE_MAX_BYTES_BUFFERING_CONFIG only effects the downstream KTable changelog stream (but you do not use this anyway). If I understand you correctly, if you

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-23 Thread Hamidreza Afzali
Thanks Matthias. Disabling the cache didn't solve the issue. Here's a sample code: https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13 The topology doesn't produce any result but it works when commenting out .map(...) in line 21. Thanks, Hamid

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-22 Thread Matthias J. Sax
In Kafka 0.10.1 a deduplication cache was introduced for aggregates, that reduces the downstream load for a KTable changelog stream. If you want to disable the cache for testing, you can set StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG to zero. Compare:

kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-22 Thread Hamidreza Afzali
Hi, When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the combination of .map(...) and .groupByKey(...).count(...) does not produce any result. The topology looks like this: builder.stream(Serdes.String, Serdes.Integer, inputTopic) .map((k, v) => new KeyValue(fn(k), v))